From 8fc1631deb223682d60eaf18c87b0abc42db5214 Mon Sep 17 00:00:00 2001 From: Antonin Houska Date: Tue, 1 Apr 2025 13:48:57 +0200 Subject: [PATCH 4/9] Add CONCURRENTLY option to REPACK command. The REPACK command copies the relation data into a new file, creates new indexes and eventually swaps the files. To make sure that the old file does not change during the copying, the relation is locked in an exclusive mode, which prevents applications from both reading and writing. (To keep the data consistent, we'd only need to prevent the applications from writing, but even reading needs to be blocked before we can swap the files - otherwise some applications could continue using the old file. Since we cannot get stronger lock without releasing the weaker one first, we acquire the exclusive lock in the beginning and keep it till the end of the processing.) This patch introduces an alternative workflow, which only requires the exclusive lock when the relation (and index) files are being swapped. (Supposedly, the swapping should be pretty fast.) On the other hand, when we copy the data to the new file, we allow applications to read from the relation and even write into it. First, we scan the relation using a "historic snapshot", and insert all the tuples satisfying this snapshot into the new file. Second, logical decoding is used to capture the data changes done by applications during the copying (i.e. changes that do not satisfy the historic snapshot mentioned above), and those are applied to the new file before we acquire the exclusive lock we need to swap the files. (Of course, more data changes can take place while we are waiting for the lock - these will be applied to the new file after we have acquired the lock, before we swap the files.) Since the logical decoding system, during its startup, waits until all the transactions which already have XID assigned have finished, there is a risk of deadlock if a transaction that already changed anything in the database tries to acquire a conflicting lock on the table REPACK CONCURRENTLY is working on. As an example, consider transaction running CREATE INDEX command on the table that is being REPACKed CONCURRENTLY. On the other hand, DML commands (INSERT, UPDATE, DELETE) are not a problem as their lock does not conflict with REPACK CONCURRENTLY. The current approach is that we accept the risk. If we tried to avoid it, it'd be necessary to unlock the table before the logical decoding is setup and lock it again afterwards. Such temporary unlocking would imply re-checking if the table still meets all the requirements for REPACK CONCURRENTLY. Like the existing implementation of REPACK, the variant with the CONCURRENTLY option also requires an extra space for the new relation and index files (which coexist with the old files for some time). In addition, the CONCURRENTLY option might introduce a lag in releasing WAL segments for archiving / recycling. This is due to the decoding of the data changes done by applications concurrently. When copying the table contents into the new file, we check the lag periodically. If it exceeds the size of WAL segment, we decode all the available WAL before resuming the copying. (Of course, the changes are not applied until the whole table contents is copied.) A background worker might be a better approach for the decoding - let's consider implementing it in the future. --- doc/src/sgml/monitoring.sgml | 65 +- doc/src/sgml/ref/repack.sgml | 116 +- src/Makefile | 1 + src/backend/access/heap/heapam_handler.c | 145 +- src/backend/access/heap/heapam_visibility.c | 30 +- src/backend/access/heap/rewriteheap.c | 6 +- src/backend/access/transam/xact.c | 11 +- src/backend/catalog/index.c | 43 +- src/backend/catalog/system_views.sql | 30 +- src/backend/commands/cluster.c | 1818 +++++++++++++++-- src/backend/commands/matview.c | 2 +- src/backend/commands/tablecmds.c | 1 + src/backend/commands/vacuum.c | 12 +- src/backend/meson.build | 1 + src/backend/parser/gram.y | 17 +- src/backend/replication/logical/decode.c | 24 + src/backend/replication/logical/snapbuild.c | 20 + .../replication/pgoutput_repack/Makefile | 32 + .../replication/pgoutput_repack/meson.build | 18 + .../pgoutput_repack/pgoutput_repack.c | 288 +++ src/backend/storage/ipc/ipci.c | 1 + .../utils/activity/wait_event_names.txt | 1 + src/backend/utils/cache/relcache.c | 1 + src/backend/utils/time/snapmgr.c | 3 +- src/bin/psql/tab-complete.in.c | 25 +- src/include/access/heapam.h | 4 + src/include/access/tableam.h | 10 + src/include/catalog/index.h | 3 + src/include/commands/cluster.h | 87 +- src/include/commands/progress.h | 17 +- src/include/nodes/parsenodes.h | 1 + src/include/replication/snapbuild.h | 1 + src/include/storage/lockdefs.h | 4 +- src/include/storage/lwlocklist.h | 1 + src/include/utils/snapmgr.h | 2 + src/test/regress/expected/rules.out | 29 +- src/tools/pgindent/typedefs.list | 4 + 37 files changed, 2611 insertions(+), 263 deletions(-) create mode 100644 src/backend/replication/pgoutput_repack/Makefile create mode 100644 src/backend/replication/pgoutput_repack/meson.build create mode 100644 src/backend/replication/pgoutput_repack/pgoutput_repack.c diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 0a6229c391a..e385a55272b 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -5835,14 +5835,35 @@ FROM pg_stat_get_backend_idset() AS backendid; - heap_tuples_written bigint + heap_tuples_inserted bigint - Number of heap tuples written. + Number of heap tuples inserted. This counter only advances when the phase is seq scanning heap, - index scanning heap - or writing new heap. + index scanning heap, + writing new heap + or catch-up. + + + + + + heap_tuples_updated bigint + + + Number of heap tuples updated. + This counter only advances when the phase is catch-up. + + + + + + heap_tuples_deleted bigint + + + Number of heap tuples deleted. + This counter only advances when the phase is catch-up. @@ -6058,14 +6079,35 @@ FROM pg_stat_get_backend_idset() AS backendid; - heap_tuples_written bigint + heap_tuples_inserted bigint - Number of heap tuples written. + Number of heap tuples inserted. This counter only advances when the phase is seq scanning heap, - index scanning heap - or writing new heap. + index scanning heap, + writing new heap + or catch-up. + + + + + + heap_tuples_updated bigint + + + Number of heap tuples updated. + This counter only advances when the phase is catch-up. + + + + + + heap_tuples_deleted bigint + + + Number of heap tuples deleted. + This counter only advances when the phase is catch-up. @@ -6146,6 +6188,13 @@ FROM pg_stat_get_backend_idset() AS backendid; REPACK is currently writing the new heap. + + catch-up + + REPACK is currently processing the DML commands that + other transactions executed during any of the preceding phase. + + swapping relation files diff --git a/doc/src/sgml/ref/repack.sgml b/doc/src/sgml/ref/repack.sgml index 84f3c3e3f2b..9ee640e3517 100644 --- a/doc/src/sgml/ref/repack.sgml +++ b/doc/src/sgml/ref/repack.sgml @@ -22,6 +22,7 @@ PostgreSQL documentation REPACK [ ( option [, ...] ) ] [ table_name [ USING INDEXindex_name ] ] +REPACK [ ( option [, ...] ) ] CONCURRENTLY table_name [ USING INDEXindex_name ] where option can be one of: @@ -48,7 +49,8 @@ REPACK [ ( option [, ...] ) ] [ MAINTAIN privilege on. This form of REPACK cannot be executed inside a transaction - block. + block. Also, this form is not allowed if + the CONCURRENTLY option is used. @@ -61,7 +63,8 @@ REPACK [ ( option [, ...] ) ] [ ACCESS EXCLUSIVE lock is acquired on it. This prevents any other database operations (both reads and writes) from operating on the table until the REPACK - is finished. + is finished. If you want to keep the table accessible during the repacking, + consider using the CONCURRENTLY option. @@ -160,6 +163,115 @@ REPACK [ ( option [, ...] ) ] [ + + CONCURRENTLY + + + Allow other transactions to use the table while it is being repacked. + + + + Internally, REPACK copies the contents of the table + (ignoring dead tuples) into a new file, sorted by the specified index, + and also creates a new file for each index. Then it swaps the old and + new files for the table and all the indexes, and deletes the old + files. The ACCESS EXCLUSIVE lock is needed to make + sure that the old files do not change during the processing because the + changes would get lost due to the swap. + + + + With the CONCURRENTLY option, the ACCESS + EXCLUSIVE lock is only acquired to swap the table and index + files. The data changes that took place during the creation of the new + table and index files are captured using logical decoding + () and applied before + the ACCESS EXCLUSIVE lock is requested. Thus the lock + is typically held only for the time needed to swap the files, which + should be pretty short. + + + + Note that REPACK with the + the CONCURRENTLY option does not try to order the + rows inserted into the table after the repacking started. Also + note REPACK might fail to complete due to DDL + commands executed on the table by other transactions during the + repacking. + + + + + In addition to the temporary space requirements explained in + , + the CONCURRENTLY option can add to the usage of + temporary space a bit more. The reason is that other transactions can + perform DML operations which cannot be applied to the new file until + REPACK has copied all the tuples from the old + file. Thus the tuples inserted into the old file during the copying are + also stored in separately in a temporary file, so they can eventually + be applied to the new file. + + + + Furthermore, the data changes performed during the copying are + extracted from write-ahead log (WAL), and + this extraction (decoding) only takes place when certain amount of WAL + has been written. Therefore, WAL removal can be delayed by this + threshold. Currently the threshold is equal to the value of + the wal_segment_size + configuration parameter. + + + + + The CONCURRENTLY option cannot be used in the + following cases: + + + + + The table is UNLOGGED. + + + + + + The table is partitioned. + + + + + + The table is a system catalog or a TOAST table. + + + + + + REPACK is executed inside a transaction block. + + + + + + The wal_level + configuration parameter is less than logical. + + + + + + The max_replication_slots + configuration parameter does not allow for creation of an additional + replication slot. + + + + + + + VERBOSE diff --git a/src/Makefile b/src/Makefile index 2f31a2f20a7..b18c9a14ffa 100644 --- a/src/Makefile +++ b/src/Makefile @@ -23,6 +23,7 @@ SUBDIRS = \ interfaces \ backend/replication/libpqwalreceiver \ backend/replication/pgoutput \ + backend/replication/pgoutput_repack \ fe_utils \ bin \ pl \ diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c index 18e349c3466..371afa6ad59 100644 --- a/src/backend/access/heap/heapam_handler.c +++ b/src/backend/access/heap/heapam_handler.c @@ -33,6 +33,7 @@ #include "catalog/index.h" #include "catalog/storage.h" #include "catalog/storage_xlog.h" +#include "commands/cluster.h" #include "commands/progress.h" #include "executor/executor.h" #include "miscadmin.h" @@ -53,6 +54,9 @@ static void reform_and_rewrite_tuple(HeapTuple tuple, static bool SampleHeapTupleVisible(TableScanDesc scan, Buffer buffer, HeapTuple tuple, OffsetNumber tupoffset); +static HeapTuple accept_tuple_for_concurrent_copy(HeapTuple tuple, + Snapshot snapshot, + Buffer buffer); static BlockNumber heapam_scan_get_blocks_done(HeapScanDesc hscan); @@ -685,6 +689,8 @@ static void heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, Relation OldIndex, bool use_sort, TransactionId OldestXmin, + Snapshot snapshot, + LogicalDecodingContext *decoding_ctx, TransactionId *xid_cutoff, MultiXactId *multi_cutoff, double *num_tuples, @@ -705,6 +711,8 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, bool *isnull; BufferHeapTupleTableSlot *hslot; BlockNumber prev_cblock = InvalidBlockNumber; + bool concurrent = snapshot != NULL; + XLogRecPtr end_of_wal_prev = GetFlushRecPtr(NULL); /* Remember if it's a system catalog */ is_system_catalog = IsSystemRelation(OldHeap); @@ -783,8 +791,10 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, for (;;) { HeapTuple tuple; + bool tuple_copied = false; Buffer buf; bool isdead; + HTSV_Result vis; CHECK_FOR_INTERRUPTS(); @@ -839,7 +849,7 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, LockBuffer(buf, BUFFER_LOCK_SHARE); - switch (HeapTupleSatisfiesVacuum(tuple, OldestXmin, buf)) + switch ((vis = HeapTupleSatisfiesVacuum(tuple, OldestXmin, buf))) { case HEAPTUPLE_DEAD: /* Definitely dead */ @@ -855,14 +865,15 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, case HEAPTUPLE_INSERT_IN_PROGRESS: /* - * Since we hold exclusive lock on the relation, normally the - * only way to see this is if it was inserted earlier in our - * own transaction. However, it can happen in system + * As long as we hold exclusive lock on the relation, normally + * the only way to see this is if it was inserted earlier in + * our own transaction. However, it can happen in system * catalogs, since we tend to release write lock before commit - * there. Give a warning if neither case applies; but in any - * case we had better copy it. + * there. Also, there's no exclusive lock during concurrent + * processing. Give a warning if neither case applies; but in + * any case we had better copy it. */ - if (!is_system_catalog && + if (!is_system_catalog && !concurrent && !TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetXmin(tuple->t_data))) elog(WARNING, "concurrent insert in progress within table \"%s\"", RelationGetRelationName(OldHeap)); @@ -874,7 +885,7 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, /* * Similar situation to INSERT_IN_PROGRESS case. */ - if (!is_system_catalog && + if (!is_system_catalog && !concurrent && !TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetUpdateXid(tuple->t_data))) elog(WARNING, "concurrent delete in progress within table \"%s\"", RelationGetRelationName(OldHeap)); @@ -888,8 +899,6 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, break; } - LockBuffer(buf, BUFFER_LOCK_UNLOCK); - if (isdead) { *tups_vacuumed += 1; @@ -900,9 +909,47 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, *tups_vacuumed += 1; *tups_recently_dead -= 1; } + + LockBuffer(buf, BUFFER_LOCK_UNLOCK); continue; } + if (concurrent) + { + /* + * Ignore concurrent changes now, they'll be processed later via + * logical decoding. + * + * INSERT_IN_PROGRESS is rejected right away because our snapshot + * should represent a point in time which should precede (or be + * equal to) the state of transactions as it was when the + * "SatisfiesVacuum" test was performed. Thus + * accept_tuple_for_concurrent_copy() should not consider the + * tuple inserted. + */ + if (vis == HEAPTUPLE_INSERT_IN_PROGRESS) + tuple = NULL; + else + tuple = accept_tuple_for_concurrent_copy(tuple, snapshot, + buf); + /* Tuple not suitable for the new heap? */ + if (tuple == NULL) + { + LockBuffer(buf, BUFFER_LOCK_UNLOCK); + continue; + } + + /* Remember that we have to free the tuple eventually. */ + tuple_copied = true; + } + + /* + * In the concurrent case, we have a copy of the tuple, so we don't + * worry whether the source tuple will be deleted / updated after we + * release the lock. + */ + LockBuffer(buf, BUFFER_LOCK_UNLOCK); + *num_tuples += 1; if (tuplesort != NULL) { @@ -919,7 +966,7 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, { const int ct_index[] = { PROGRESS_REPACK_HEAP_TUPLES_SCANNED, - PROGRESS_REPACK_HEAP_TUPLES_WRITTEN + PROGRESS_REPACK_HEAP_TUPLES_INSERTED }; int64 ct_val[2]; @@ -934,6 +981,33 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, ct_val[1] = *num_tuples; pgstat_progress_update_multi_param(2, ct_index, ct_val); } + if (tuple_copied) + heap_freetuple(tuple); + + /* + * Process the WAL produced by the load, as well as by other + * transactions, so that the replication slot can advance and WAL does + * not pile up. Use wal_segment_size as a threshold so that we do not + * introduce the decoding overhead too often. + * + * Of course, we must not apply the changes until the initial load has + * completed. + * + * Note that our insertions into the new table should not be decoded + * as we (intentionally) do not write the logical decoding specific + * information to WAL. + */ + if (concurrent) + { + XLogRecPtr end_of_wal; + + end_of_wal = GetFlushRecPtr(NULL); + if ((end_of_wal - end_of_wal_prev) > wal_segment_size) + { + repack_decode_concurrent_changes(decoding_ctx, end_of_wal); + end_of_wal_prev = end_of_wal; + } + } } if (indexScan != NULL) @@ -977,7 +1051,7 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, values, isnull, rwstate); /* Report n_tuples */ - pgstat_progress_update_param(PROGRESS_REPACK_HEAP_TUPLES_WRITTEN, + pgstat_progress_update_param(PROGRESS_REPACK_HEAP_TUPLES_INSERTED, n_tuples); } @@ -2023,6 +2097,53 @@ heapam_scan_get_blocks_done(HeapScanDesc hscan) return blocks_done; } +/* + * Return copy of 'tuple' if it has been inserted according to 'snapshot', or + * NULL if the insertion took place in the future. If the tuple is already + * marked as deleted or updated by a transaction that 'snapshot' still + * considers running, clear the deletion / update XID in the header of the + * copied tuple. This way the returned tuple is suitable for insertion into + * the new heap. + */ +static HeapTuple +accept_tuple_for_concurrent_copy(HeapTuple tuple, Snapshot snapshot, + Buffer buffer) +{ + HeapTuple result; + + Assert(snapshot->snapshot_type == SNAPSHOT_MVCC); + + /* + * First, check if the tuple insertion is visible by our snapshot. + */ + if (!HeapTupleMVCCInserted(tuple, snapshot, buffer)) + return NULL; + + result = heap_copytuple(tuple); + + /* + * If the tuple was deleted / updated but our snapshot still sees it, we + * need to keep it. In that case, clear the information that indicates the + * deletion / update. Otherwise the tuple chain would stay incomplete (as + * we will reject the new tuple above), and the delete / update would fail + * if executed later during logical decoding. + */ + if (TransactionIdIsNormal(HeapTupleHeaderGetRawXmax(result->t_data)) && + HeapTupleMVCCNotDeleted(result, snapshot, buffer)) + { + /* TODO More work needed here? */ + result->t_data->t_infomask |= HEAP_XMAX_INVALID; + HeapTupleHeaderSetXmax(result->t_data, 0); + } + + /* + * Accept the tuple even if our snapshot considers it deleted - older + * snapshots can still see the tuple, while the decoded transactions + * should not try to update / delete it again. + */ + return result; +} + /* ------------------------------------------------------------------------ * Miscellaneous callbacks for the heap AM diff --git a/src/backend/access/heap/heapam_visibility.c b/src/backend/access/heap/heapam_visibility.c index 05f6946fe60..a46e1812b21 100644 --- a/src/backend/access/heap/heapam_visibility.c +++ b/src/backend/access/heap/heapam_visibility.c @@ -955,13 +955,14 @@ HeapTupleSatisfiesDirty(HeapTuple htup, Snapshot snapshot, * did TransactionIdIsInProgress in each call --- to no avail, as long as the * inserting/deleting transaction was still running --- which was more cycles * and more contention on ProcArrayLock. + * + * The checks are split into two functions, HeapTupleMVCCInserted() and + * HeapTupleMVCCNotDeleted(), because they are also useful separately. */ static bool HeapTupleSatisfiesMVCC(HeapTuple htup, Snapshot snapshot, Buffer buffer) { - HeapTupleHeader tuple = htup->t_data; - /* * Assert that the caller has registered the snapshot. This function * doesn't care about the registration as such, but in general you @@ -974,6 +975,20 @@ HeapTupleSatisfiesMVCC(HeapTuple htup, Snapshot snapshot, Assert(ItemPointerIsValid(&htup->t_self)); Assert(htup->t_tableOid != InvalidOid); + return HeapTupleMVCCInserted(htup, snapshot, buffer) && + HeapTupleMVCCNotDeleted(htup, snapshot, buffer); +} + +/* + * HeapTupleMVCCInserted + * True iff heap tuple was successfully inserted for the given MVCC + * snapshot. + */ +bool +HeapTupleMVCCInserted(HeapTuple htup, Snapshot snapshot, Buffer buffer) +{ + HeapTupleHeader tuple = htup->t_data; + if (!HeapTupleHeaderXminCommitted(tuple)) { if (HeapTupleHeaderXminInvalid(tuple)) @@ -1082,6 +1097,17 @@ HeapTupleSatisfiesMVCC(HeapTuple htup, Snapshot snapshot, } /* by here, the inserting transaction has committed */ + return true; +} + +/* + * HeapTupleMVCCNotDeleted + * True iff heap tuple was not deleted for the given MVCC snapshot. + */ +bool +HeapTupleMVCCNotDeleted(HeapTuple htup, Snapshot snapshot, Buffer buffer) +{ + HeapTupleHeader tuple = htup->t_data; if (tuple->t_infomask & HEAP_XMAX_INVALID) /* xid invalid or aborted */ return true; diff --git a/src/backend/access/heap/rewriteheap.c b/src/backend/access/heap/rewriteheap.c index e6d2b5fced1..6aa2ed214f2 100644 --- a/src/backend/access/heap/rewriteheap.c +++ b/src/backend/access/heap/rewriteheap.c @@ -617,9 +617,9 @@ raw_heap_insert(RewriteState state, HeapTuple tup) int options = HEAP_INSERT_SKIP_FSM; /* - * While rewriting the heap for VACUUM FULL / CLUSTER, make sure data - * for the TOAST table are not logically decoded. The main heap is - * WAL-logged as XLOG FPI records, which are not logically decoded. + * While rewriting the heap for REPACK, make sure data for the TOAST + * table are not logically decoded. The main heap is WAL-logged as + * XLOG FPI records, which are not logically decoded. */ options |= HEAP_INSERT_NO_LOGICAL; diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index b885513f765..23f2de587a1 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -215,6 +215,7 @@ typedef struct TransactionStateData bool parallelChildXact; /* is any parent transaction parallel? */ bool chain; /* start a new block after this one */ bool topXidLogged; /* for a subxact: is top-level XID logged? */ + bool internal; /* for a subxact: launched internally? */ struct TransactionStateData *parent; /* back link to parent */ } TransactionStateData; @@ -4723,6 +4724,7 @@ BeginInternalSubTransaction(const char *name) /* Normal subtransaction start */ PushTransaction(); s = CurrentTransactionState; /* changed by push */ + s->internal = true; /* * Savepoint names, like the TransactionState block itself, live @@ -5239,7 +5241,13 @@ AbortSubTransaction(void) LWLockReleaseAll(); pgstat_report_wait_end(); - pgstat_progress_end_command(); + + /* + * Internal subtransacion might be used by an user command, in which case + * the command outlives the subtransaction. + */ + if (!s->internal) + pgstat_progress_end_command(); pgaio_error_cleanup(); @@ -5456,6 +5464,7 @@ PushTransaction(void) s->parallelModeLevel = 0; s->parallelChildXact = (p->parallelModeLevel != 0 || p->parallelChildXact); s->topXidLogged = false; + s->internal = false; CurrentTransactionState = s; diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c index 466cf0fdef6..c70521d1d54 100644 --- a/src/backend/catalog/index.c +++ b/src/backend/catalog/index.c @@ -1418,22 +1418,7 @@ index_concurrently_create_copy(Relation heapRelation, Oid oldIndexId, for (int i = 0; i < newInfo->ii_NumIndexAttrs; i++) opclassOptions[i] = get_attoptions(oldIndexId, i + 1); - /* Extract statistic targets for each attribute */ - stattargets = palloc0_array(NullableDatum, newInfo->ii_NumIndexAttrs); - for (int i = 0; i < newInfo->ii_NumIndexAttrs; i++) - { - HeapTuple tp; - Datum dat; - - tp = SearchSysCache2(ATTNUM, ObjectIdGetDatum(oldIndexId), Int16GetDatum(i + 1)); - if (!HeapTupleIsValid(tp)) - elog(ERROR, "cache lookup failed for attribute %d of relation %u", - i + 1, oldIndexId); - dat = SysCacheGetAttr(ATTNUM, tp, Anum_pg_attribute_attstattarget, &isnull); - ReleaseSysCache(tp); - stattargets[i].value = dat; - stattargets[i].isnull = isnull; - } + stattargets = get_index_stattargets(oldIndexId, newInfo); /* * Now create the new index. @@ -1472,6 +1457,32 @@ index_concurrently_create_copy(Relation heapRelation, Oid oldIndexId, return newIndexId; } +NullableDatum * +get_index_stattargets(Oid indexid, IndexInfo *indInfo) +{ + NullableDatum *stattargets; + + /* Extract statistic targets for each attribute */ + stattargets = palloc0_array(NullableDatum, indInfo->ii_NumIndexAttrs); + for (int i = 0; i < indInfo->ii_NumIndexAttrs; i++) + { + HeapTuple tp; + Datum dat; + bool isnull; + + tp = SearchSysCache2(ATTNUM, ObjectIdGetDatum(indexid), Int16GetDatum(i + 1)); + if (!HeapTupleIsValid(tp)) + elog(ERROR, "cache lookup failed for attribute %d of relation %u", + i + 1, indexid); + dat = SysCacheGetAttr(ATTNUM, tp, Anum_pg_attribute_attstattarget, &isnull); + ReleaseSysCache(tp); + stattargets[i].value = dat; + stattargets[i].isnull = isnull; + } + + return stattargets; +} + /* * index_concurrently_build * diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 5de46bcac52..70265e5e701 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1249,16 +1249,17 @@ CREATE VIEW pg_stat_progress_cluster AS WHEN 2 THEN 'index scanning heap' WHEN 3 THEN 'sorting tuples' WHEN 4 THEN 'writing new heap' - WHEN 5 THEN 'swapping relation files' - WHEN 6 THEN 'rebuilding index' - WHEN 7 THEN 'performing final cleanup' + -- 5 is 'catch-up', but that should not appear here. + WHEN 6 THEN 'swapping relation files' + WHEN 7 THEN 'rebuilding index' + WHEN 8 THEN 'performing final cleanup' END AS phase, CAST(S.param3 AS oid) AS cluster_index_relid, S.param4 AS heap_tuples_scanned, S.param5 AS heap_tuples_written, - S.param6 AS heap_blks_total, - S.param7 AS heap_blks_scanned, - S.param8 AS index_rebuild_count + S.param8 AS heap_blks_total, + S.param9 AS heap_blks_scanned, + S.param10 AS index_rebuild_count FROM pg_stat_get_progress_info('CLUSTER') AS S LEFT JOIN pg_database D ON S.datid = D.oid; @@ -1275,16 +1276,19 @@ CREATE VIEW pg_stat_progress_repack AS WHEN 2 THEN 'index scanning heap' WHEN 3 THEN 'sorting tuples' WHEN 4 THEN 'writing new heap' - WHEN 5 THEN 'swapping relation files' - WHEN 6 THEN 'rebuilding index' - WHEN 7 THEN 'performing final cleanup' + WHEN 5 THEN 'catch-up' + WHEN 6 THEN 'swapping relation files' + WHEN 7 THEN 'rebuilding index' + WHEN 8 THEN 'performing final cleanup' END AS phase, CAST(S.param3 AS oid) AS repack_index_relid, S.param4 AS heap_tuples_scanned, - S.param5 AS heap_tuples_written, - S.param6 AS heap_blks_total, - S.param7 AS heap_blks_scanned, - S.param8 AS index_rebuild_count + S.param5 AS heap_tuples_inserted, + S.param6 AS heap_tuples_updated, + S.param7 AS heap_tuples_deleted, + S.param8 AS heap_blks_total, + S.param9 AS heap_blks_scanned, + S.param10 AS index_rebuild_count FROM pg_stat_get_progress_info('REPACK') AS S LEFT JOIN pg_database D ON S.datid = D.oid; diff --git a/src/backend/commands/cluster.c b/src/backend/commands/cluster.c index 67625d52f12..4d08a28ff7e 100644 --- a/src/backend/commands/cluster.c +++ b/src/backend/commands/cluster.c @@ -25,6 +25,10 @@ #include "access/toast_internals.h" #include "access/transam.h" #include "access/xact.h" +#include "access/xlog.h" +#include "access/xlog_internal.h" +#include "access/xloginsert.h" +#include "access/xlogutils.h" #include "catalog/catalog.h" #include "catalog/dependency.h" #include "catalog/heap.h" @@ -32,6 +36,7 @@ #include "catalog/namespace.h" #include "catalog/objectaccess.h" #include "catalog/pg_am.h" +#include "catalog/pg_control.h" #include "catalog/pg_inherits.h" #include "catalog/toasting.h" #include "commands/cluster.h" @@ -39,10 +44,15 @@ #include "commands/progress.h" #include "commands/tablecmds.h" #include "commands/vacuum.h" +#include "executor/executor.h" #include "miscadmin.h" #include "optimizer/optimizer.h" #include "pgstat.h" +#include "replication/decode.h" +#include "replication/logical.h" +#include "replication/snapbuild.h" #include "storage/bufmgr.h" +#include "storage/ipc.h" #include "storage/lmgr.h" #include "storage/predicate.h" #include "utils/acl.h" @@ -76,16 +86,46 @@ typedef struct ((cmd) == CLUSTER_COMMAND_REPACK ? \ "repack" : "vacuum")) +/* + * The following definitions are used for concurrent processing. + */ + +/* + * The locators are used to avoid logical decoding of data that we do not need + * for our table. + */ +RelFileLocator repacked_rel_locator = {.relNumber = InvalidOid}; +RelFileLocator repacked_rel_toast_locator = {.relNumber = InvalidOid}; + +/* + * Everything we need to call ExecInsertIndexTuples(). + */ +typedef struct IndexInsertState +{ + ResultRelInfo *rri; + EState *estate; + + Relation ident_index; +} IndexInsertState; + +/* The WAL segment being decoded. */ +static XLogSegNo repack_current_segment = 0; + static void cluster_multiple_rels(List *rtcs, ClusterParams *params, - ClusterCommand cmd); + ClusterCommand cmd, LOCKMODE lockmode, + bool isTopLevel); static bool cluster_rel_recheck(Relation OldHeap, Oid indexOid, Oid userid, - ClusterCommand cmd, int options); + ClusterCommand cmd, LOCKMODE lmode, + int options); +static void check_repack_concurrently_requirements(Relation rel); static void rebuild_relation(Relation OldHeap, Relation index, bool verbose, - ClusterCommand cmd); + ClusterCommand cmd, bool concurrent, Oid userid); static void copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex, + Snapshot snapshot, LogicalDecodingContext *decoding_ctx, bool verbose, ClusterCommand cmd, bool *pSwapToastByContent, - TransactionId *pFreezeXid, MultiXactId *pCutoffMulti); + TransactionId *pFreezeXid, + MultiXactId *pCutoffMulti); static List *get_tables_to_cluster(MemoryContext cluster_context); static List *get_tables_to_repack(MemoryContext repack_context); static List *get_tables_to_cluster_partitioned(MemoryContext cluster_context, @@ -93,8 +133,53 @@ static List *get_tables_to_cluster_partitioned(MemoryContext cluster_context, ClusterCommand cmd); static bool cluster_is_permitted_for_relation(Oid relid, Oid userid, ClusterCommand cmd); +static void begin_concurrent_repack(Relation rel); +static void end_concurrent_repack(void); +static LogicalDecodingContext *setup_logical_decoding(Oid relid, + const char *slotname, + TupleDesc tupdesc); +static HeapTuple get_changed_tuple(char *change); +static void apply_concurrent_changes(RepackDecodingState *dstate, + Relation rel, ScanKey key, int nkeys, + IndexInsertState *iistate); +static void apply_concurrent_insert(Relation rel, ConcurrentChange *change, + HeapTuple tup, IndexInsertState *iistate, + TupleTableSlot *index_slot); +static void apply_concurrent_update(Relation rel, HeapTuple tup, + HeapTuple tup_target, + ConcurrentChange *change, + IndexInsertState *iistate, + TupleTableSlot *index_slot); +static void apply_concurrent_delete(Relation rel, HeapTuple tup_target, + ConcurrentChange *change); +static HeapTuple find_target_tuple(Relation rel, ScanKey key, int nkeys, + HeapTuple tup_key, + IndexInsertState *iistate, + TupleTableSlot *ident_slot, + IndexScanDesc *scan_p); +static void process_concurrent_changes(LogicalDecodingContext *ctx, + XLogRecPtr end_of_wal, + Relation rel_dst, + Relation rel_src, + ScanKey ident_key, + int ident_key_nentries, + IndexInsertState *iistate); +static IndexInsertState *get_index_insert_state(Relation relation, + Oid ident_index_id); +static ScanKey build_identity_key(Oid ident_idx_oid, Relation rel_src, + int *nentries); +static void free_index_insert_state(IndexInsertState *iistate); +static void cleanup_logical_decoding(LogicalDecodingContext *ctx); +static void rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap, + Relation cl_index, + LogicalDecodingContext *ctx, + bool swap_toast_by_content, + TransactionId frozenXid, + MultiXactId cutoffMulti); +static List *build_new_indexes(Relation NewHeap, Relation OldHeap, List *OldIndexes); static Relation process_single_relation(RangeVar *relation, char *indexname, - ClusterCommand cmd, + ClusterCommand cmd, LOCKMODE lockmode, + bool isTopLevel, ClusterParams *params, Oid *indexOid_p); @@ -153,8 +238,9 @@ cluster(ParseState *pstate, ClusterStmt *stmt, bool isTopLevel) if (stmt->relation != NULL) { rel = process_single_relation(stmt->relation, stmt->indexname, - CLUSTER_COMMAND_CLUSTER, ¶ms, - &indexOid); + CLUSTER_COMMAND_CLUSTER, + AccessExclusiveLock, isTopLevel, + ¶ms, &indexOid); if (rel == NULL) return; } @@ -204,7 +290,8 @@ cluster(ParseState *pstate, ClusterStmt *stmt, bool isTopLevel) } /* Do the job. */ - cluster_multiple_rels(rtcs, ¶ms, CLUSTER_COMMAND_CLUSTER); + cluster_multiple_rels(rtcs, ¶ms, CLUSTER_COMMAND_CLUSTER, + AccessExclusiveLock, isTopLevel); /* Start a new transaction for the cleanup work. */ StartTransactionCommand(); @@ -221,8 +308,8 @@ cluster(ParseState *pstate, ClusterStmt *stmt, bool isTopLevel) * return. */ static void -cluster_multiple_rels(List *rtcs, ClusterParams *params, - ClusterCommand cmd) +cluster_multiple_rels(List *rtcs, ClusterParams *params, ClusterCommand cmd, + LOCKMODE lockmode, bool isTopLevel) { ListCell *lc; @@ -242,10 +329,10 @@ cluster_multiple_rels(List *rtcs, ClusterParams *params, /* functions in indexes may want a snapshot set */ PushActiveSnapshot(GetTransactionSnapshot()); - rel = table_open(rtc->tableOid, AccessExclusiveLock); + rel = table_open(rtc->tableOid, lockmode); /* Process this table */ - cluster_rel(rel, rtc->indexOid, params, cmd); + cluster_rel(rel, rtc->indexOid, params, cmd, isTopLevel); /* cluster_rel closes the relation, but keeps lock */ PopActiveSnapshot(); @@ -269,12 +356,18 @@ cluster_multiple_rels(List *rtcs, ClusterParams *params, * instead of index order. This is the new implementation of VACUUM FULL, * and error messages should refer to the operation as VACUUM not CLUSTER. * + * Note that, in the concurrent case, the function releases the lock at some + * point, in order to get AccessExclusiveLock for the final steps (i.e. to + * swap the relation files). To make things simpler, the caller should expect + * OldHeap to be closed on return, regardless CLUOPT_CONCURRENT. (The + * AccessExclusiveLock is kept till the end of the transaction.) + * * 'cmd' indicates which commands is being executed. REPACK should be the only * caller of this function in the future. */ void cluster_rel(Relation OldHeap, Oid indexOid, ClusterParams *params, - ClusterCommand cmd) + ClusterCommand cmd, bool isTopLevel) { Oid tableOid = RelationGetRelid(OldHeap); Oid save_userid; @@ -284,8 +377,34 @@ cluster_rel(Relation OldHeap, Oid indexOid, ClusterParams *params, bool recheck = ((params->options & CLUOPT_RECHECK) != 0); Relation index; const char *cmd_str = CLUSTER_COMMAND_STR(cmd); + bool concurrent = ((params->options & CLUOPT_CONCURRENT) != 0); + LOCKMODE lmode; + + /* + * Check that the correct lock is held. The lock mode is + * AccessExclusiveLock for normal processing and ShareUpdateExclusiveLock + * for concurrent processing (so that SELECT, INSERT, UPDATE and DELETE + * commands work, but cluster_rel() cannot be called concurrently for the + * same relation). + */ + lmode = !concurrent ? AccessExclusiveLock : ShareUpdateExclusiveLock; - Assert(CheckRelationLockedByMe(OldHeap, AccessExclusiveLock, false)); + /* There are specific requirements on concurrent processing. */ + if (concurrent) + { + /* + * Make sure we have no XID assigned, otherwise call of + * setup_logical_decoding() can cause a deadlock. + * + * The existence of transaction block actually does not imply that XID + * was already assigned, but it very likely is. We might want to check + * the result of GetCurrentTransactionIdIfAny() instead, but that + * would be less clear from user's perspective. + */ + PreventInTransactionBlock(isTopLevel, "REPACK CONCURRENTLY"); + + check_repack_concurrently_requirements(OldHeap); + } /* Check for user-requested abort. */ CHECK_FOR_INTERRUPTS(); @@ -331,7 +450,7 @@ cluster_rel(Relation OldHeap, Oid indexOid, ClusterParams *params, * to cluster a not-previously-clustered index. */ if (recheck) - if (!cluster_rel_recheck(OldHeap, indexOid, save_userid, cmd, + if (!cluster_rel_recheck(OldHeap, indexOid, save_userid, cmd, lmode, params->options)) goto out; @@ -350,6 +469,12 @@ cluster_rel(Relation OldHeap, Oid indexOid, ClusterParams *params, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot %s a shared catalog", cmd_str))); + /* + * The CONCURRENTLY case should have been rejected earlier because it does + * not support system catalogs. + */ + Assert(!(OldHeap->rd_rel->relisshared && concurrent)); + /* * Don't process temp tables of other backends ... their local buffer * manager is not going to cope. @@ -370,8 +495,7 @@ cluster_rel(Relation OldHeap, Oid indexOid, ClusterParams *params, if (OidIsValid(indexOid)) { /* verify the index is good and lock it */ - check_index_is_clusterable(OldHeap, indexOid, AccessExclusiveLock, - cmd); + check_index_is_clusterable(OldHeap, indexOid, lmode, cmd); /* also open it */ index = index_open(indexOid, NoLock); } @@ -388,7 +512,9 @@ cluster_rel(Relation OldHeap, Oid indexOid, ClusterParams *params, if (OldHeap->rd_rel->relkind == RELKIND_MATVIEW && !RelationIsPopulated(OldHeap)) { - relation_close(OldHeap, AccessExclusiveLock); + if (index) + index_close(index, lmode); + relation_close(OldHeap, lmode); goto out; } @@ -401,11 +527,35 @@ cluster_rel(Relation OldHeap, Oid indexOid, ClusterParams *params, * invalid, because we move tuples around. Promote them to relation * locks. Predicate locks on indexes will be promoted when they are * reindexed. + * + * During concurrent processing, the heap as well as its indexes stay in + * operation, so we postpone this step until they are locked using + * AccessExclusiveLock near the end of the processing. */ - TransferPredicateLocksToHeapRelation(OldHeap); + if (!concurrent) + TransferPredicateLocksToHeapRelation(OldHeap); /* rebuild_relation does all the dirty work */ - rebuild_relation(OldHeap, index, verbose, cmd); + PG_TRY(); + { + /* + * For concurrent processing, make sure that our logical decoding + * ignores data changes of other tables than the one we are + * processing. + */ + if (concurrent) + begin_concurrent_repack(OldHeap); + + rebuild_relation(OldHeap, index, verbose, cmd, concurrent, + save_userid); + } + PG_FINALLY(); + { + if (concurrent) + end_concurrent_repack(); + } + PG_END_TRY(); + /* rebuild_relation closes OldHeap, and index if valid */ out: @@ -424,14 +574,14 @@ out: */ static bool cluster_rel_recheck(Relation OldHeap, Oid indexOid, Oid userid, - ClusterCommand cmd, int options) + ClusterCommand cmd, LOCKMODE lmode, int options) { Oid tableOid = RelationGetRelid(OldHeap); /* Check that the user still has privileges for the relation */ if (!cluster_is_permitted_for_relation(tableOid, userid, cmd)) { - relation_close(OldHeap, AccessExclusiveLock); + relation_close(OldHeap, lmode); return false; } @@ -445,7 +595,7 @@ cluster_rel_recheck(Relation OldHeap, Oid indexOid, Oid userid, */ if (RELATION_IS_OTHER_TEMP(OldHeap)) { - relation_close(OldHeap, AccessExclusiveLock); + relation_close(OldHeap, lmode); return false; } @@ -456,7 +606,7 @@ cluster_rel_recheck(Relation OldHeap, Oid indexOid, Oid userid, */ if (!SearchSysCacheExists1(RELOID, ObjectIdGetDatum(indexOid))) { - relation_close(OldHeap, AccessExclusiveLock); + relation_close(OldHeap, lmode); return false; } @@ -467,7 +617,7 @@ cluster_rel_recheck(Relation OldHeap, Oid indexOid, Oid userid, if ((options & CLUOPT_RECHECK_ISCLUSTERED) != 0 && !get_index_isclustered(indexOid)) { - relation_close(OldHeap, AccessExclusiveLock); + relation_close(OldHeap, lmode); return false; } } @@ -611,19 +761,87 @@ mark_index_clustered(Relation rel, Oid indexOid, bool is_internal) table_close(pg_index, RowExclusiveLock); } +/* + * Check if the CONCURRENTLY option is legal for the relation. + */ +static void +check_repack_concurrently_requirements(Relation rel) +{ + char relpersistence, + replident; + Oid ident_idx; + + /* Data changes in system relations are not logically decoded. */ + if (IsCatalogRelation(rel)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot repack relation \"%s\"", + RelationGetRelationName(rel)), + errhint("REPACK CONCURRENTLY is not supported for catalog relations."))); + + /* + * reorderbuffer.c does not seem to handle processing of TOAST relation + * alone. + */ + if (IsToastRelation(rel)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot repack relation \"%s\"", + RelationGetRelationName(rel)), + errhint("REPACK CONCURRENTLY is not supported for TOAST relations, unless the main relation is repacked too."))); + + relpersistence = rel->rd_rel->relpersistence; + if (relpersistence != RELPERSISTENCE_PERMANENT) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot repack relation \"%s\"", + RelationGetRelationName(rel)), + errhint("REPACK CONCURRENTLY is only allowed for permanent relations."))); + + /* With NOTHING, WAL does not contain the old tuple. */ + replident = rel->rd_rel->relreplident; + if (replident == REPLICA_IDENTITY_NOTHING) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot repack relation \"%s\"", + RelationGetRelationName(rel)), + errhint("Relation \"%s\" has insufficient replication identity.", + RelationGetRelationName(rel)))); + + /* + * Identity index is not set if the replica identity is FULL, but PK might + * exist in such a case. + */ + ident_idx = RelationGetReplicaIndex(rel); + if (!OidIsValid(ident_idx) && OidIsValid(rel->rd_pkindex)) + ident_idx = rel->rd_pkindex; + if (!OidIsValid(ident_idx)) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot process relation \"%s\"", + RelationGetRelationName(rel)), + (errhint("Relation \"%s\" has no identity index.", + RelationGetRelationName(rel))))); +} + /* * rebuild_relation: rebuild an existing relation in index or physical order * - * OldHeap: table to rebuild. + * OldHeap: table to rebuild. See cluster_rel() for comments on the required + * lock strength. + * * index: index to cluster by, or NULL to rewrite in physical order. * - * On entry, heap and index (if one is given) must be open, and - * AccessExclusiveLock held on them. - * On exit, they are closed, but locks on them are not released. + * On entry, heap and index (if one is given) must be open, and the + * appropriate lock held on them (AccessExclusiveLock for exclusive processing + * and ShareUpdateExclusiveLock for concurrent processing).. + * + * On exit, they are closed, but still locked with AccessExclusiveLock (The + * function handles the lock upgrade if 'concurrent' is true.) */ static void rebuild_relation(Relation OldHeap, Relation index, bool verbose, - ClusterCommand cmd) + ClusterCommand cmd, bool concurrent, Oid userid) { Oid tableOid = RelationGetRelid(OldHeap); Oid accessMethod = OldHeap->rd_rel->relam; @@ -631,21 +849,61 @@ rebuild_relation(Relation OldHeap, Relation index, bool verbose, Oid OIDNewHeap; Relation NewHeap; char relpersistence; - bool is_system_catalog; bool swap_toast_by_content; TransactionId frozenXid; MultiXactId cutoffMulti; + NameData slotname; + LogicalDecodingContext *ctx = NULL; + Snapshot snapshot = NULL; +#if USE_ASSERT_CHECKING + LOCKMODE lmode; + + lmode = !concurrent ? AccessExclusiveLock : ShareUpdateExclusiveLock; + + Assert(CheckRelationLockedByMe(OldHeap, lmode, false) && + (index == NULL || CheckRelationLockedByMe(index, lmode, false))); +#endif + + if (concurrent) + { + TupleDesc tupdesc; + + /* + * REPACK CONCURRENTLY is not allowed in a transaction block, so this + * should never fire. + */ + Assert(GetTopTransactionIdIfAny() == InvalidTransactionId); + + /* + * A single backend should not execute multiple REPACK commands at a + * time, so use PID to make the slot unique. + */ + snprintf(NameStr(slotname), NAMEDATALEN, "repack_%d", MyProcPid); + + tupdesc = CreateTupleDescCopy(RelationGetDescr(OldHeap)); + + /* + * Prepare to capture the concurrent data changes. + * + * Note that this call waits for all transactions with XID already + * assigned to finish. If some of those transactions is waiting for a + * lock conflicting with ShareUpdateExclusiveLock on our table (e.g. + * it runs CREATE INDEX), we can end up in a deadlock. Not sure this + * risk is worth unlocking/locking the table (and its clustering + * index) and checking again if its still eligible for REPACK + * CONCURRENTLY. + */ + ctx = setup_logical_decoding(tableOid, NameStr(slotname), tupdesc); - Assert(CheckRelationLockedByMe(OldHeap, AccessExclusiveLock, false) && - (index == NULL || CheckRelationLockedByMe(index, AccessExclusiveLock, false))); + snapshot = SnapBuildInitialSnapshotForRepack(ctx->snapshot_builder); + } - if (index) + if (index && cmd == CLUSTER_COMMAND_CLUSTER) /* Mark the correct index as clustered */ mark_index_clustered(OldHeap, RelationGetRelid(index), true); /* Remember info about rel before closing OldHeap */ relpersistence = OldHeap->rd_rel->relpersistence; - is_system_catalog = IsSystemRelation(OldHeap); /* * Create the transient table that will receive the re-ordered data. @@ -661,30 +919,49 @@ rebuild_relation(Relation OldHeap, Relation index, bool verbose, NewHeap = table_open(OIDNewHeap, NoLock); /* Copy the heap data into the new table in the desired order */ - copy_table_data(NewHeap, OldHeap, index, verbose, cmd, - &swap_toast_by_content, &frozenXid, &cutoffMulti); + copy_table_data(NewHeap, OldHeap, index, snapshot, ctx, verbose, + cmd, &swap_toast_by_content, &frozenXid, &cutoffMulti); + if (concurrent) + { + rebuild_relation_finish_concurrent(NewHeap, OldHeap, index, + ctx, swap_toast_by_content, + frozenXid, cutoffMulti); + + pgstat_progress_update_param(PROGRESS_REPACK_PHASE, + PROGRESS_REPACK_PHASE_FINAL_CLEANUP); + + /* Done with decoding. */ + FreeSnapshot(snapshot); + cleanup_logical_decoding(ctx); + ReplicationSlotRelease(); + ReplicationSlotDrop(NameStr(slotname), false); + } + else + { + bool is_system_catalog = IsSystemRelation(OldHeap); - /* Close relcache entries, but keep lock until transaction commit */ - table_close(OldHeap, NoLock); - if (index) - index_close(index, NoLock); + /* Close relcache entries, but keep lock until transaction commit */ + table_close(OldHeap, NoLock); + if (index) + index_close(index, NoLock); - /* - * Close the new relation so it can be dropped as soon as the storage is - * swapped. The relation is not visible to others, so no need to unlock it - * explicitly. - */ - table_close(NewHeap, NoLock); + /* + * Close the new relation so it can be dropped as soon as the storage + * is swapped. The relation is not visible to others, so no need to + * unlock it explicitly. + */ + table_close(NewHeap, NoLock); - /* - * Swap the physical files of the target and transient tables, then - * rebuild the target's indexes and throw away the transient table. - */ - finish_heap_swap(tableOid, OIDNewHeap, is_system_catalog, - swap_toast_by_content, false, true, - frozenXid, cutoffMulti, - relpersistence); + /* + * Swap the physical files of the target and transient tables, then + * rebuild the target's indexes and throw away the transient table. + */ + finish_heap_swap(tableOid, OIDNewHeap, is_system_catalog, + swap_toast_by_content, false, true, true, + frozenXid, cutoffMulti, + relpersistence); + } } @@ -819,14 +1096,18 @@ make_new_heap(Oid OIDOldHeap, Oid NewTableSpace, Oid NewAccessMethod, /* * Do the physical copying of table data. * + * 'snapshot' and 'decoding_ctx': see table_relation_copy_for_cluster(). Pass + * iff concurrent processing is required. + * * There are three output parameters: * *pSwapToastByContent is set true if toast tables must be swapped by content. * *pFreezeXid receives the TransactionId used as freeze cutoff point. * *pCutoffMulti receives the MultiXactId used as a cutoff point. */ static void -copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex, bool verbose, - ClusterCommand cmd, bool *pSwapToastByContent, +copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex, + Snapshot snapshot, LogicalDecodingContext *decoding_ctx, + bool verbose, ClusterCommand cmd, bool *pSwapToastByContent, TransactionId *pFreezeXid, MultiXactId *pCutoffMulti) { Relation relRelation; @@ -845,6 +1126,7 @@ copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex, bool verb const char *cmd_str = CLUSTER_COMMAND_STR(cmd); PGRUsage ru0; char *nspname; + bool concurrent = snapshot != NULL; pg_rusage_init(&ru0); @@ -948,8 +1230,48 @@ copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex, bool verb * provided, else plain seqscan. */ if (OldIndex != NULL && OldIndex->rd_rel->relam == BTREE_AM_OID) + { + ResourceOwner oldowner = NULL; + ResourceOwner resowner = NULL; + + /* + * In the CONCURRENT case, use a dedicated resource owner so we don't + * leave any additional locks behind us that we cannot release easily. + */ + if (concurrent) + { + Assert(CheckRelationLockedByMe(OldHeap, ShareUpdateExclusiveLock, + false)); + Assert(CheckRelationLockedByMe(OldIndex, ShareUpdateExclusiveLock, + false)); + + resowner = ResourceOwnerCreate(CurrentResourceOwner, + "plan_cluster_use_sort"); + oldowner = CurrentResourceOwner; + CurrentResourceOwner = resowner; + } + use_sort = plan_cluster_use_sort(RelationGetRelid(OldHeap), RelationGetRelid(OldIndex)); + + if (concurrent) + { + CurrentResourceOwner = oldowner; + + /* + * We are primarily concerned about locks, but if the planner + * happened to allocate any other resources, we should release + * them too because we're going to delete the whole resowner. + */ + ResourceOwnerRelease(resowner, RESOURCE_RELEASE_BEFORE_LOCKS, + false, false); + ResourceOwnerRelease(resowner, RESOURCE_RELEASE_LOCKS, + false, false); + ResourceOwnerRelease(resowner, RESOURCE_RELEASE_AFTER_LOCKS, + false, false); + ResourceOwnerDelete(resowner); + } + } else use_sort = false; @@ -981,7 +1303,9 @@ copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex, bool verb * values (e.g. because the AM doesn't use freezing). */ table_relation_copy_for_cluster(OldHeap, NewHeap, OldIndex, use_sort, - cutoffs.OldestXmin, &cutoffs.FreezeLimit, + cutoffs.OldestXmin, snapshot, + decoding_ctx, + &cutoffs.FreezeLimit, &cutoffs.MultiXactCutoff, &num_tuples, &tups_vacuumed, &tups_recently_dead); @@ -990,7 +1314,11 @@ copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex, bool verb *pFreezeXid = cutoffs.FreezeLimit; *pCutoffMulti = cutoffs.MultiXactCutoff; - /* Reset rd_toastoid just to be tidy --- it shouldn't be looked at again */ + /* + * Reset rd_toastoid just to be tidy --- it shouldn't be looked at again. + * In the CONCURRENTLY case, we need to set it again before applying the + * concurrent changes. + */ NewHeap->rd_toastoid = InvalidOid; num_pages = RelationGetNumberOfBlocks(NewHeap); @@ -1448,14 +1776,13 @@ finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap, bool swap_toast_by_content, bool check_constraints, bool is_internal, + bool reindex, TransactionId frozenXid, MultiXactId cutoffMulti, char newrelpersistence) { ObjectAddress object; Oid mapped_tables[4]; - int reindex_flags; - ReindexParams reindex_params = {0}; int i; /* Report that we are now swapping relation files */ @@ -1481,39 +1808,47 @@ finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap, if (is_system_catalog) CacheInvalidateCatalog(OIDOldHeap); - /* - * Rebuild each index on the relation (but not the toast table, which is - * all-new at this point). It is important to do this before the DROP - * step because if we are processing a system catalog that will be used - * during DROP, we want to have its indexes available. There is no - * advantage to the other order anyway because this is all transactional, - * so no chance to reclaim disk space before commit. We do not need a - * final CommandCounterIncrement() because reindex_relation does it. - * - * Note: because index_build is called via reindex_relation, it will never - * set indcheckxmin true for the indexes. This is OK even though in some - * sense we are building new indexes rather than rebuilding existing ones, - * because the new heap won't contain any HOT chains at all, let alone - * broken ones, so it can't be necessary to set indcheckxmin. - */ - reindex_flags = REINDEX_REL_SUPPRESS_INDEX_USE; - if (check_constraints) - reindex_flags |= REINDEX_REL_CHECK_CONSTRAINTS; + if (reindex) + { + int reindex_flags; + ReindexParams reindex_params = {0}; - /* - * Ensure that the indexes have the same persistence as the parent - * relation. - */ - if (newrelpersistence == RELPERSISTENCE_UNLOGGED) - reindex_flags |= REINDEX_REL_FORCE_INDEXES_UNLOGGED; - else if (newrelpersistence == RELPERSISTENCE_PERMANENT) - reindex_flags |= REINDEX_REL_FORCE_INDEXES_PERMANENT; + /* + * Rebuild each index on the relation (but not the toast table, which + * is all-new at this point). It is important to do this before the + * DROP step because if we are processing a system catalog that will + * be used during DROP, we want to have its indexes available. There + * is no advantage to the other order anyway because this is all + * transactional, so no chance to reclaim disk space before commit. We + * do not need a final CommandCounterIncrement() because + * reindex_relation does it. + * + * Note: because index_build is called via reindex_relation, it will + * never set indcheckxmin true for the indexes. This is OK even + * though in some sense we are building new indexes rather than + * rebuilding existing ones, because the new heap won't contain any + * HOT chains at all, let alone broken ones, so it can't be necessary + * to set indcheckxmin. + */ + reindex_flags = REINDEX_REL_SUPPRESS_INDEX_USE; + if (check_constraints) + reindex_flags |= REINDEX_REL_CHECK_CONSTRAINTS; - /* Report that we are now reindexing relations */ - pgstat_progress_update_param(PROGRESS_REPACK_PHASE, - PROGRESS_REPACK_PHASE_REBUILD_INDEX); + /* + * Ensure that the indexes have the same persistence as the parent + * relation. + */ + if (newrelpersistence == RELPERSISTENCE_UNLOGGED) + reindex_flags |= REINDEX_REL_FORCE_INDEXES_UNLOGGED; + else if (newrelpersistence == RELPERSISTENCE_PERMANENT) + reindex_flags |= REINDEX_REL_FORCE_INDEXES_PERMANENT; - reindex_relation(NULL, OIDOldHeap, reindex_flags, &reindex_params); + /* Report that we are now reindexing relations */ + pgstat_progress_update_param(PROGRESS_REPACK_PHASE, + PROGRESS_REPACK_PHASE_REBUILD_INDEX); + + reindex_relation(NULL, OIDOldHeap, reindex_flags, &reindex_params); + } /* Report that we are now doing clean up */ pgstat_progress_update_param(PROGRESS_REPACK_PHASE, @@ -1825,89 +2160,1253 @@ cluster_is_permitted_for_relation(Oid relid, Oid userid, ClusterCommand cmd) return false; } +#define REPL_PLUGIN_NAME "pgoutput_repack" + /* - * REPACK is intended to be a replacement of both CLUSTER and VACUUM FULL. + * Call this function before REPACK CONCURRENTLY starts to setup logical + * decoding. It makes sure that other users of the table put enough + * information into WAL. + * + * The point is that at various places we expect that the table we're + * processing is treated like a system catalog. For example, we need to be + * able to scan it using a "historic snapshot" anytime during the processing + * (as opposed to scanning only at the start point of the decoding, as logical + * replication does during initial table synchronization), in order to apply + * concurrent UPDATE / DELETE commands. + * + * Note that TOAST table needs no attention here as it's not scanned using + * historic snapshot. */ -void -repack(ParseState *pstate, RepackStmt *stmt, bool isTopLevel) +static void +begin_concurrent_repack(Relation rel) { - ListCell *lc; - ClusterParams params = {0}; - bool verbose = false; - Relation rel = NULL; - Oid indexOid = InvalidOid; - MemoryContext repack_context; - List *rtcs; + Oid toastrelid; - /* Parse option list */ - foreach(lc, stmt->params) + /* Avoid logical decoding of other relations by this backend. */ + repacked_rel_locator = rel->rd_locator; + toastrelid = rel->rd_rel->reltoastrelid; + if (OidIsValid(toastrelid)) { - DefElem *opt = (DefElem *) lfirst(lc); + Relation toastrel; - if (strcmp(opt->defname, "verbose") == 0) - verbose = defGetBoolean(opt); - else - ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("unrecognized REPACK option \"%s\"", - opt->defname), - parser_errposition(pstate, opt->location))); + /* Avoid logical decoding of other TOAST relations. */ + toastrel = table_open(toastrelid, AccessShareLock); + repacked_rel_toast_locator = toastrel->rd_locator; + table_close(toastrel, AccessShareLock); } +} - params.options = (verbose ? CLUOPT_VERBOSE : 0); +/* + * Call this when done with REPACK CONCURRENTLY. + */ +static void +end_concurrent_repack(void) +{ + /* + * Restore normal function of (future) logical decoding for this backend. + */ + repacked_rel_locator.relNumber = InvalidOid; + repacked_rel_toast_locator.relNumber = InvalidOid; +} - if (stmt->relation != NULL) - { - rel = process_single_relation(stmt->relation, stmt->indexname, - CLUSTER_COMMAND_REPACK, ¶ms, - &indexOid); - if (rel == NULL) - return; - } +/* + * This function is much like pg_create_logical_replication_slot() except that + * the new slot is neither released (if anyone else could read changes from + * our slot, we could miss changes other backends do while we copy the + * existing data into temporary table), nor persisted (it's easier to handle + * crash by restarting all the work from scratch). + */ +static LogicalDecodingContext * +setup_logical_decoding(Oid relid, const char *slotname, TupleDesc tupdesc) +{ + LogicalDecodingContext *ctx; + RepackDecodingState *dstate; /* - * By here, we know we are in a multi-table situation. In order to avoid - * holding locks for too long, we want to process each table in its own - * transaction. This forces us to disallow running inside a user - * transaction block. + * Check if we can use logical decoding. */ - PreventInTransactionBlock(isTopLevel, "REPACK"); + CheckSlotPermissions(); + CheckLogicalDecodingRequirements(); - /* Also, we need a memory context to hold our list of relations */ - repack_context = AllocSetContextCreate(PortalContext, - "Repack", - ALLOCSET_DEFAULT_SIZES); + /* RS_TEMPORARY so that the slot gets cleaned up on ERROR. */ + ReplicationSlotCreate(slotname, true, RS_TEMPORARY, false, false, false); - params.options |= CLUOPT_RECHECK; - if (rel != NULL) - { - Oid relid; - bool rel_is_index; + /* + * Neither prepare_write nor do_write callback nor update_progress is + * useful for us. + * + * Regarding the value of need_full_snapshot, we pass false because the + * table we are processing is present in RepackedRelsHash and therefore, + * regarding logical decoding, treated like a catalog. + */ + ctx = CreateInitDecodingContext(REPL_PLUGIN_NAME, + NIL, + false, + InvalidXLogRecPtr, + XL_ROUTINE(.page_read = read_local_xlog_page, + .segment_open = wal_segment_open, + .segment_close = wal_segment_close), + NULL, NULL, NULL); - Assert(rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE); + /* + * We don't have control on setting fast_forward, so at least check it. + */ + Assert(!ctx->fast_forward); - if (OidIsValid(indexOid)) - { - relid = indexOid; - rel_is_index = true; - } - else + DecodingContextFindStartpoint(ctx); + + /* Some WAL records should have been read. */ + Assert(ctx->reader->EndRecPtr != InvalidXLogRecPtr); + + XLByteToSeg(ctx->reader->EndRecPtr, repack_current_segment, + wal_segment_size); + + /* + * Setup structures to store decoded changes. + */ + dstate = palloc0(sizeof(RepackDecodingState)); + dstate->relid = relid; + dstate->tstore = tuplestore_begin_heap(false, false, + maintenance_work_mem); + + dstate->tupdesc = tupdesc; + + /* Initialize the descriptor to store the changes ... */ + dstate->tupdesc_change = CreateTemplateTupleDesc(1); + + TupleDescInitEntry(dstate->tupdesc_change, 1, NULL, BYTEAOID, -1, 0); + /* ... as well as the corresponding slot. */ + dstate->tsslot = MakeSingleTupleTableSlot(dstate->tupdesc_change, + &TTSOpsMinimalTuple); + + dstate->resowner = ResourceOwnerCreate(CurrentResourceOwner, + "logical decoding"); + + ctx->output_writer_private = dstate; + return ctx; +} + +/* + * Retrieve tuple from ConcurrentChange structure. + * + * The input data starts with the structure but it might not be appropriately + * aligned. + */ +static HeapTuple +get_changed_tuple(char *change) +{ + HeapTupleData tup_data; + HeapTuple result; + char *src; + + /* + * Ensure alignment before accessing the fields. (This is why we can't use + * heap_copytuple() instead of this function.) + */ + src = change + offsetof(ConcurrentChange, tup_data); + memcpy(&tup_data, src, sizeof(HeapTupleData)); + + result = (HeapTuple) palloc(HEAPTUPLESIZE + tup_data.t_len); + memcpy(result, &tup_data, sizeof(HeapTupleData)); + result->t_data = (HeapTupleHeader) ((char *) result + HEAPTUPLESIZE); + src = change + SizeOfConcurrentChange; + memcpy(result->t_data, src, result->t_len); + + return result; +} + +/* + * Decode logical changes from the WAL sequence up to end_of_wal. + */ +void +repack_decode_concurrent_changes(LogicalDecodingContext *ctx, + XLogRecPtr end_of_wal) +{ + RepackDecodingState *dstate; + ResourceOwner resowner_old; + + /* + * Invalidate the "present" cache before moving to "(recent) history". + */ + InvalidateSystemCaches(); + + dstate = (RepackDecodingState *) ctx->output_writer_private; + resowner_old = CurrentResourceOwner; + CurrentResourceOwner = dstate->resowner; + + PG_TRY(); + { + while (ctx->reader->EndRecPtr < end_of_wal) { - relid = RelationGetRelid(rel); - rel_is_index = false; - } - rtcs = get_tables_to_cluster_partitioned(repack_context, relid, - rel_is_index, - CLUSTER_COMMAND_REPACK); + XLogRecord *record; + XLogSegNo segno_new; + char *errm = NULL; + XLogRecPtr end_lsn; - /* close relation, releasing lock on parent table */ - table_close(rel, AccessExclusiveLock); + record = XLogReadRecord(ctx->reader, &errm); + if (errm) + elog(ERROR, "%s", errm); + + if (record != NULL) + LogicalDecodingProcessRecord(ctx, ctx->reader); + + /* + * If WAL segment boundary has been crossed, inform the decoding + * system that the catalog_xmin can advance. (We can confirm more + * often, but a filling a single WAL segment should not take much + * time.) + */ + end_lsn = ctx->reader->EndRecPtr; + XLByteToSeg(end_lsn, segno_new, wal_segment_size); + if (segno_new != repack_current_segment) + { + LogicalConfirmReceivedLocation(end_lsn); + elog(DEBUG1, "REPACK: confirmed receive location %X/%X", + (uint32) (end_lsn >> 32), (uint32) end_lsn); + repack_current_segment = segno_new; + } + + CHECK_FOR_INTERRUPTS(); + } + InvalidateSystemCaches(); + CurrentResourceOwner = resowner_old; } - else - rtcs = get_tables_to_repack(repack_context); + PG_CATCH(); + { + /* clear all timetravel entries */ + InvalidateSystemCaches(); + CurrentResourceOwner = resowner_old; + PG_RE_THROW(); + } + PG_END_TRY(); +} + +/* + * Apply changes that happened during the initial load. + * + * Scan key is passed by caller, so it does not have to be constructed + * multiple times. Key entries have all fields initialized, except for + * sk_argument. + */ +static void +apply_concurrent_changes(RepackDecodingState *dstate, Relation rel, + ScanKey key, int nkeys, IndexInsertState *iistate) +{ + TupleTableSlot *index_slot, + *ident_slot; + HeapTuple tup_old = NULL; + + if (dstate->nchanges == 0) + return; + + /* TupleTableSlot is needed to pass the tuple to ExecInsertIndexTuples(). */ + index_slot = MakeSingleTupleTableSlot(dstate->tupdesc, &TTSOpsHeapTuple); + + /* A slot to fetch tuples from identity index. */ + ident_slot = table_slot_create(rel, NULL); + + while (tuplestore_gettupleslot(dstate->tstore, true, false, + dstate->tsslot)) + { + bool shouldFree; + HeapTuple tup_change, + tup, + tup_exist; + char *change_raw, + *src; + ConcurrentChange change; + bool isnull[1]; + Datum values[1]; + + CHECK_FOR_INTERRUPTS(); + + /* Get the change from the single-column tuple. */ + tup_change = ExecFetchSlotHeapTuple(dstate->tsslot, false, &shouldFree); + heap_deform_tuple(tup_change, dstate->tupdesc_change, values, isnull); + Assert(!isnull[0]); + + /* Make sure we access aligned data. */ + change_raw = (char *) DatumGetByteaP(values[0]); + src = (char *) VARDATA(change_raw); + memcpy(&change, src, SizeOfConcurrentChange); + + /* TRUNCATE change contains no tuple, so process it separately. */ + if (change.kind == CHANGE_TRUNCATE) + { + /* + * All the things that ExecuteTruncateGuts() does (such as firing + * triggers or handling the DROP_CASCADE behavior) should have + * taken place on the source relation. Thus we only do the actual + * truncation of the new relation (and its indexes). + */ + heap_truncate_one_rel(rel); + + pfree(tup_change); + continue; + } + + /* + * Extract the tuple from the change. The tuple is copied here because + * it might be assigned to 'tup_old', in which case it needs to + * survive into the next iteration. + */ + tup = get_changed_tuple(src); + + if (change.kind == CHANGE_UPDATE_OLD) + { + Assert(tup_old == NULL); + tup_old = tup; + } + else if (change.kind == CHANGE_INSERT) + { + Assert(tup_old == NULL); + + apply_concurrent_insert(rel, &change, tup, iistate, index_slot); + + pfree(tup); + } + else if (change.kind == CHANGE_UPDATE_NEW || + change.kind == CHANGE_DELETE) + { + IndexScanDesc ind_scan = NULL; + HeapTuple tup_key; + + if (change.kind == CHANGE_UPDATE_NEW) + { + tup_key = tup_old != NULL ? tup_old : tup; + } + else + { + Assert(tup_old == NULL); + tup_key = tup; + } + + /* + * Find the tuple to be updated or deleted. + */ + tup_exist = find_target_tuple(rel, key, nkeys, tup_key, + iistate, ident_slot, &ind_scan); + if (tup_exist == NULL) + elog(ERROR, "Failed to find target tuple"); + + if (change.kind == CHANGE_UPDATE_NEW) + apply_concurrent_update(rel, tup, tup_exist, &change, iistate, + index_slot); + else + apply_concurrent_delete(rel, tup_exist, &change); + + if (tup_old != NULL) + { + pfree(tup_old); + tup_old = NULL; + } + + pfree(tup); + index_endscan(ind_scan); + } + else + elog(ERROR, "Unrecognized kind of change: %d", change.kind); + + /* If there's any change, make it visible to the next iteration. */ + if (change.kind != CHANGE_UPDATE_OLD) + { + CommandCounterIncrement(); + UpdateActiveSnapshotCommandId(); + } + + /* TTSOpsMinimalTuple has .get_heap_tuple==NULL. */ + Assert(shouldFree); + pfree(tup_change); + } + + tuplestore_clear(dstate->tstore); + dstate->nchanges = 0; + + /* Cleanup. */ + ExecDropSingleTupleTableSlot(index_slot); + ExecDropSingleTupleTableSlot(ident_slot); +} + +static void +apply_concurrent_insert(Relation rel, ConcurrentChange *change, HeapTuple tup, + IndexInsertState *iistate, TupleTableSlot *index_slot) +{ + List *recheck; + + + simple_heap_insert(rel, tup); + + /* + * Update indexes. + * + * In case functions in the index need the active snapshot and caller + * hasn't set one. + */ + ExecStoreHeapTuple(tup, index_slot, false); + recheck = ExecInsertIndexTuples(iistate->rri, + index_slot, + iistate->estate, + false, /* update */ + false, /* noDupErr */ + NULL, /* specConflict */ + NIL, /* arbiterIndexes */ + false /* onlySummarizing */ + ); + + /* + * If recheck is required, it must have been preformed on the source + * relation by now. (All the logical changes we process here are already + * committed.) + */ + list_free(recheck); + + pgstat_progress_incr_param(PROGRESS_REPACK_HEAP_TUPLES_INSERTED, 1); +} + +static void +apply_concurrent_update(Relation rel, HeapTuple tup, HeapTuple tup_target, + ConcurrentChange *change, IndexInsertState *iistate, + TupleTableSlot *index_slot) +{ + List *recheck; + TU_UpdateIndexes update_indexes; + + /* + * Write the new tuple into the new heap. ('tup' gets the TID assigned + * here.) + */ + simple_heap_update(rel, &tup_target->t_self, tup, &update_indexes); + + ExecStoreHeapTuple(tup, index_slot, false); + + if (update_indexes != TU_None) + { + recheck = ExecInsertIndexTuples(iistate->rri, + index_slot, + iistate->estate, + true, /* update */ + false, /* noDupErr */ + NULL, /* specConflict */ + NIL, /* arbiterIndexes */ + /* onlySummarizing */ + update_indexes == TU_Summarizing); + list_free(recheck); + } + + pgstat_progress_incr_param(PROGRESS_REPACK_HEAP_TUPLES_UPDATED, 1); +} + +static void +apply_concurrent_delete(Relation rel, HeapTuple tup_target, + ConcurrentChange *change) +{ + simple_heap_delete(rel, &tup_target->t_self); + + pgstat_progress_incr_param(PROGRESS_REPACK_HEAP_TUPLES_DELETED, 1); +} + +/* + * Find the tuple to be updated or deleted. + * + * 'key' is a pre-initialized scan key, into which the function will put the + * key values. + * + * 'tup_key' is a tuple containing the key values for the scan. + * + * On exit,'*scan_p' contains the scan descriptor used. The caller must close + * it when he no longer needs the tuple returned. + */ +static HeapTuple +find_target_tuple(Relation rel, ScanKey key, int nkeys, HeapTuple tup_key, + IndexInsertState *iistate, + TupleTableSlot *ident_slot, IndexScanDesc *scan_p) +{ + IndexScanDesc scan; + Form_pg_index ident_form; + int2vector *ident_indkey; + HeapTuple result = NULL; + + /* XXX no instrumentation for now */ + scan = index_beginscan(rel, iistate->ident_index, GetActiveSnapshot(), + NULL, nkeys, 0); + *scan_p = scan; + index_rescan(scan, key, nkeys, NULL, 0); + + /* Info needed to retrieve key values from heap tuple. */ + ident_form = iistate->ident_index->rd_index; + ident_indkey = &ident_form->indkey; + + /* Use the incoming tuple to finalize the scan key. */ + for (int i = 0; i < scan->numberOfKeys; i++) + { + ScanKey entry; + bool isnull; + int16 attno_heap; + + entry = &scan->keyData[i]; + attno_heap = ident_indkey->values[i]; + entry->sk_argument = heap_getattr(tup_key, + attno_heap, + rel->rd_att, + &isnull); + Assert(!isnull); + } + if (index_getnext_slot(scan, ForwardScanDirection, ident_slot)) + { + bool shouldFree; + + result = ExecFetchSlotHeapTuple(ident_slot, false, &shouldFree); + /* TTSOpsBufferHeapTuple has .get_heap_tuple != NULL. */ + Assert(!shouldFree); + } + + return result; +} + +/* + * Decode and apply concurrent changes. + * + * Pass rel_src iff its reltoastrelid is needed. + */ +static void +process_concurrent_changes(LogicalDecodingContext *ctx, XLogRecPtr end_of_wal, + Relation rel_dst, Relation rel_src, ScanKey ident_key, + int ident_key_nentries, IndexInsertState *iistate) +{ + RepackDecodingState *dstate; + + pgstat_progress_update_param(PROGRESS_REPACK_PHASE, + PROGRESS_REPACK_PHASE_CATCH_UP); + + dstate = (RepackDecodingState *) ctx->output_writer_private; + + repack_decode_concurrent_changes(ctx, end_of_wal); + + if (dstate->nchanges == 0) + return; + + PG_TRY(); + { + /* + * Make sure that TOAST values can eventually be accessed via the old + * relation - see comment in copy_table_data(). + */ + if (rel_src) + rel_dst->rd_toastoid = rel_src->rd_rel->reltoastrelid; + + apply_concurrent_changes(dstate, rel_dst, ident_key, + ident_key_nentries, iistate); + } + PG_FINALLY(); + { + if (rel_src) + rel_dst->rd_toastoid = InvalidOid; + } + PG_END_TRY(); +} + +static IndexInsertState * +get_index_insert_state(Relation relation, Oid ident_index_id) +{ + EState *estate; + int i; + IndexInsertState *result; + + result = (IndexInsertState *) palloc0(sizeof(IndexInsertState)); + estate = CreateExecutorState(); + + result->rri = (ResultRelInfo *) palloc(sizeof(ResultRelInfo)); + InitResultRelInfo(result->rri, relation, 0, 0, 0); + ExecOpenIndices(result->rri, false); + + /* + * Find the relcache entry of the identity index so that we spend no extra + * effort to open / close it. + */ + for (i = 0; i < result->rri->ri_NumIndices; i++) + { + Relation ind_rel; + + ind_rel = result->rri->ri_IndexRelationDescs[i]; + if (ind_rel->rd_id == ident_index_id) + result->ident_index = ind_rel; + } + if (result->ident_index == NULL) + elog(ERROR, "Failed to open identity index"); + + /* Only initialize fields needed by ExecInsertIndexTuples(). */ + result->estate = estate; + + return result; +} + +/* + * Build scan key to process logical changes. + */ +static ScanKey +build_identity_key(Oid ident_idx_oid, Relation rel_src, int *nentries) +{ + Relation ident_idx_rel; + Form_pg_index ident_idx; + int n, + i; + ScanKey result; + + Assert(OidIsValid(ident_idx_oid)); + ident_idx_rel = index_open(ident_idx_oid, AccessShareLock); + ident_idx = ident_idx_rel->rd_index; + n = ident_idx->indnatts; + result = (ScanKey) palloc(sizeof(ScanKeyData) * n); + for (i = 0; i < n; i++) + { + ScanKey entry; + int16 relattno; + Form_pg_attribute att; + Oid opfamily, + opcintype, + opno, + opcode; + + entry = &result[i]; + relattno = ident_idx->indkey.values[i]; + if (relattno >= 1) + { + TupleDesc desc; + + desc = rel_src->rd_att; + att = TupleDescAttr(desc, relattno - 1); + } + else + elog(ERROR, "Unexpected attribute number %d in index", relattno); + + opfamily = ident_idx_rel->rd_opfamily[i]; + opcintype = ident_idx_rel->rd_opcintype[i]; + opno = get_opfamily_member(opfamily, opcintype, opcintype, + BTEqualStrategyNumber); + + if (!OidIsValid(opno)) + elog(ERROR, "Failed to find = operator for type %u", opcintype); + + opcode = get_opcode(opno); + if (!OidIsValid(opcode)) + elog(ERROR, "Failed to find = operator for operator %u", opno); + + /* Initialize everything but argument. */ + ScanKeyInit(entry, + i + 1, + BTEqualStrategyNumber, opcode, + (Datum) NULL); + entry->sk_collation = att->attcollation; + } + index_close(ident_idx_rel, AccessShareLock); + + *nentries = n; + return result; +} + +static void +free_index_insert_state(IndexInsertState *iistate) +{ + ExecCloseIndices(iistate->rri); + FreeExecutorState(iistate->estate); + pfree(iistate->rri); + pfree(iistate); +} + +static void +cleanup_logical_decoding(LogicalDecodingContext *ctx) +{ + RepackDecodingState *dstate; + + dstate = (RepackDecodingState *) ctx->output_writer_private; + + ExecDropSingleTupleTableSlot(dstate->tsslot); + FreeTupleDesc(dstate->tupdesc_change); + FreeTupleDesc(dstate->tupdesc); + tuplestore_end(dstate->tstore); + + FreeDecodingContext(ctx); +} + +/* + * The final steps of rebuild_relation() for concurrent processing. + * + * On entry, NewHeap is locked in AccessExclusiveLock mode. OldHeap and its + * clustering index (if one is passed) are still locked in a mode that allows + * concurrent data changes. On exit, both tables and their indexes are closed, + * but locked in AccessExclusiveLock mode. + */ +static void +rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap, + Relation cl_index, + LogicalDecodingContext *ctx, + bool swap_toast_by_content, + TransactionId frozenXid, + MultiXactId cutoffMulti) +{ + LOCKMODE lockmode_old PG_USED_FOR_ASSERTS_ONLY; + List *ind_oids_new; + Oid old_table_oid = RelationGetRelid(OldHeap); + Oid new_table_oid = RelationGetRelid(NewHeap); + List *ind_oids_old = RelationGetIndexList(OldHeap); + ListCell *lc, + *lc2; + char relpersistence; + bool is_system_catalog; + Oid ident_idx_old, + ident_idx_new; + IndexInsertState *iistate; + ScanKey ident_key; + int ident_key_nentries; + XLogRecPtr wal_insert_ptr, + end_of_wal; + char dummy_rec_data = '\0'; + Relation *ind_refs, + *ind_refs_p; + int nind; + + /* Like in cluster_rel(). */ + lockmode_old = ShareUpdateExclusiveLock; + Assert(CheckRelationLockedByMe(OldHeap, lockmode_old, false)); + Assert(cl_index == NULL || + CheckRelationLockedByMe(cl_index, lockmode_old, false)); + /* This is expected from the caller. */ + Assert(CheckRelationLockedByMe(NewHeap, AccessExclusiveLock, false)); + + ident_idx_old = RelationGetReplicaIndex(OldHeap); + + /* + * Unlike the exclusive case, we build new indexes for the new relation + * rather than swapping the storage and reindexing the old relation. The + * point is that the index build can take some time, so we do it before we + * get AccessExclusiveLock on the old heap and therefore we cannot swap + * the heap storage yet. + * + * index_create() will lock the new indexes using AccessExclusiveLock - no + * need to change that. + */ + ind_oids_new = build_new_indexes(NewHeap, OldHeap, ind_oids_old); + + /* + * Processing shouldn't start w/o valid identity index. + */ + Assert(OidIsValid(ident_idx_old)); + + /* Find "identity index" on the new relation. */ + ident_idx_new = InvalidOid; + forboth(lc, ind_oids_old, lc2, ind_oids_new) + { + Oid ind_old = lfirst_oid(lc); + Oid ind_new = lfirst_oid(lc2); + + if (ident_idx_old == ind_old) + { + ident_idx_new = ind_new; + break; + } + } + if (!OidIsValid(ident_idx_new)) + + /* + * Should not happen, given our lock on the old relation. + */ + ereport(ERROR, + (errmsg("Identity index missing on the new relation"))); + + /* Executor state to update indexes. */ + iistate = get_index_insert_state(NewHeap, ident_idx_new); + + /* + * Build scan key that we'll use to look for rows to be updated / deleted + * during logical decoding. + */ + ident_key = build_identity_key(ident_idx_new, OldHeap, &ident_key_nentries); + + /* + * Flush all WAL records inserted so far (possibly except for the last + * incomplete page, see GetInsertRecPtr), to minimize the amount of data + * we need to flush while holding exclusive lock on the source table. + */ + wal_insert_ptr = GetInsertRecPtr(); + XLogFlush(wal_insert_ptr); + end_of_wal = GetFlushRecPtr(NULL); + + /* + * Apply concurrent changes first time, to minimize the time we need to + * hold AccessExclusiveLock. (Quite some amount of WAL could have been + * written during the data copying and index creation.) + */ + process_concurrent_changes(ctx, end_of_wal, NewHeap, + swap_toast_by_content ? OldHeap : NULL, + ident_key, ident_key_nentries, iistate); + + /* + * Acquire AccessExclusiveLock on the table, its TOAST relation (if there + * is one), all its indexes, so that we can swap the files. + * + * Before that, unlock the index temporarily to avoid deadlock in case + * another transaction is trying to lock it while holding the lock on the + * table. + */ + if (cl_index) + { + index_close(cl_index, ShareUpdateExclusiveLock); + cl_index = NULL; + } + /* For the same reason, unlock TOAST relation. */ + if (OldHeap->rd_rel->reltoastrelid) + LockRelationOid(OldHeap->rd_rel->reltoastrelid, AccessExclusiveLock); + /* Finally lock the table */ + LockRelationOid(old_table_oid, AccessExclusiveLock); + + /* + * Lock all indexes now, not only the clustering one: all indexes need to + * have their files swapped. While doing that, store their relation + * references in an array, to handle predicate locks below. + */ + ind_refs_p = ind_refs = palloc_array(Relation, list_length(ind_oids_old)); + nind = 0; + foreach(lc, ind_oids_old) + { + Oid ind_oid; + Relation index; + + ind_oid = lfirst_oid(lc); + index = index_open(ind_oid, AccessExclusiveLock); + *ind_refs_p = index; + ind_refs_p++; + nind++; + } + + /* + * In addition, lock the OldHeap's TOAST relation exclusively - again, the + * lock is needed to swap the files. + */ + if (OidIsValid(OldHeap->rd_rel->reltoastrelid)) + LockRelationOid(OldHeap->rd_rel->reltoastrelid, AccessExclusiveLock); + + /* + * Tuples and pages of the old heap will be gone, but the heap will stay. + */ + TransferPredicateLocksToHeapRelation(OldHeap); + /* The same for indexes. */ + for (int i = 0; i < nind; i++) + { + Relation index = ind_refs[i]; + + TransferPredicateLocksToHeapRelation(index); + + /* + * References to indexes on the old relation are not needed anymore, + * however locks stay till the end of the transaction. + */ + index_close(index, NoLock); + } + pfree(ind_refs); + + /* + * Flush anything we see in WAL, to make sure that all changes committed + * while we were waiting for the exclusive lock are available for + * decoding. This should not be necessary if all backends had + * synchronous_commit set, but we can't rely on this setting. + * + * Unfortunately, GetInsertRecPtr() may lag behind the actual insert + * position, and GetLastImportantRecPtr() points at the start of the last + * record rather than at the end. Thus the simplest way to determine the + * insert position is to insert a dummy record and use its LSN. + * + * XXX Consider using GetLastImportantRecPtr() and adding the size of the + * last record (plus the total size of all the page headers the record + * spans)? + */ + XLogBeginInsert(); + XLogRegisterData(&dummy_rec_data, 1); + wal_insert_ptr = XLogInsert(RM_XLOG_ID, XLOG_NOOP); + XLogFlush(wal_insert_ptr); + end_of_wal = GetFlushRecPtr(NULL); + + /* Apply the concurrent changes again. */ + process_concurrent_changes(ctx, end_of_wal, NewHeap, + swap_toast_by_content ? OldHeap : NULL, + ident_key, ident_key_nentries, iistate); + + /* Remember info about rel before closing OldHeap */ + relpersistence = OldHeap->rd_rel->relpersistence; + is_system_catalog = IsSystemRelation(OldHeap); + + pgstat_progress_update_param(PROGRESS_REPACK_PHASE, + PROGRESS_REPACK_PHASE_SWAP_REL_FILES); + + /* + * Even ShareUpdateExclusiveLock should have prevented others from + * creating / dropping indexes (even using the CONCURRENTLY option), so we + * do not need to check whether the lists match. + */ + forboth(lc, ind_oids_old, lc2, ind_oids_new) + { + Oid ind_old = lfirst_oid(lc); + Oid ind_new = lfirst_oid(lc2); + Oid mapped_tables[4]; + + /* Zero out possible results from swapped_relation_files */ + memset(mapped_tables, 0, sizeof(mapped_tables)); + + swap_relation_files(ind_old, ind_new, + (old_table_oid == RelationRelationId), + swap_toast_by_content, + true, + InvalidTransactionId, + InvalidMultiXactId, + mapped_tables); + +#ifdef USE_ASSERT_CHECKING + + /* + * Concurrent processing is not supported for system relations, so + * there should be no mapped tables. + */ + for (int i = 0; i < 4; i++) + Assert(mapped_tables[i] == 0); +#endif + } + + /* The new indexes must be visible for deletion. */ + CommandCounterIncrement(); + + /* Close the old heap but keep lock until transaction commit. */ + table_close(OldHeap, NoLock); + /* Close the new heap. (We didn't have to open its indexes). */ + table_close(NewHeap, NoLock); + + /* Cleanup what we don't need anymore. (And close the identity index.) */ + pfree(ident_key); + free_index_insert_state(iistate); + + /* + * Swap the relations and their TOAST relations and TOAST indexes. This + * also drops the new relation and its indexes. + * + * (System catalogs are currently not supported.) + */ + Assert(!is_system_catalog); + finish_heap_swap(old_table_oid, new_table_oid, + is_system_catalog, + swap_toast_by_content, + false, true, false, + frozenXid, cutoffMulti, + relpersistence); +} + +/* + * Build indexes on NewHeap according to those on OldHeap. + * + * OldIndexes is the list of index OIDs on OldHeap. + * + * A list of OIDs of the corresponding indexes created on NewHeap is + * returned. The order of items does match, so we can use these arrays to swap + * index storage. + */ +static List * +build_new_indexes(Relation NewHeap, Relation OldHeap, List *OldIndexes) +{ + StringInfo ind_name; + ListCell *lc; + List *result = NIL; + + pgstat_progress_update_param(PROGRESS_REPACK_PHASE, + PROGRESS_REPACK_PHASE_REBUILD_INDEX); + + ind_name = makeStringInfo(); + + foreach(lc, OldIndexes) + { + Oid ind_oid, + ind_oid_new, + tbsp_oid; + Relation ind; + IndexInfo *ind_info; + int i, + heap_col_id; + List *colnames; + int16 indnatts; + Oid *collations, + *opclasses; + HeapTuple tup; + bool isnull; + Datum d; + oidvector *oidvec; + int2vector *int2vec; + size_t oid_arr_size; + size_t int2_arr_size; + int16 *indoptions; + text *reloptions = NULL; + bits16 flags; + Datum *opclassOptions; + NullableDatum *stattargets; + + ind_oid = lfirst_oid(lc); + ind = index_open(ind_oid, AccessShareLock); + ind_info = BuildIndexInfo(ind); + + tbsp_oid = ind->rd_rel->reltablespace; + + /* + * Index name really doesn't matter, we'll eventually use only their + * storage. Just make them unique within the table. + */ + resetStringInfo(ind_name); + appendStringInfo(ind_name, "ind_%d", + list_cell_number(OldIndexes, lc)); + + flags = 0; + if (ind->rd_index->indisprimary) + flags |= INDEX_CREATE_IS_PRIMARY; + + colnames = NIL; + indnatts = ind->rd_index->indnatts; + oid_arr_size = sizeof(Oid) * indnatts; + int2_arr_size = sizeof(int16) * indnatts; + + collations = (Oid *) palloc(oid_arr_size); + for (i = 0; i < indnatts; i++) + { + char *colname; + + heap_col_id = ind->rd_index->indkey.values[i]; + if (heap_col_id > 0) + { + Form_pg_attribute att; + + /* Normal attribute. */ + att = TupleDescAttr(OldHeap->rd_att, heap_col_id - 1); + colname = pstrdup(NameStr(att->attname)); + collations[i] = att->attcollation; + } + else if (heap_col_id == 0) + { + HeapTuple tuple; + Form_pg_attribute att; + + /* + * Expression column is not present in relcache. What we need + * here is an attribute of the *index* relation. + */ + tuple = SearchSysCache2(ATTNUM, + ObjectIdGetDatum(ind_oid), + Int16GetDatum(i + 1)); + if (!HeapTupleIsValid(tuple)) + elog(ERROR, + "cache lookup failed for attribute %d of relation %u", + i + 1, ind_oid); + att = (Form_pg_attribute) GETSTRUCT(tuple); + colname = pstrdup(NameStr(att->attname)); + collations[i] = att->attcollation; + ReleaseSysCache(tuple); + } + else + elog(ERROR, "Unexpected column number: %d", + heap_col_id); + + colnames = lappend(colnames, colname); + } + + /* + * Special effort needed for variable length attributes of + * Form_pg_index. + */ + tup = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(ind_oid)); + if (!HeapTupleIsValid(tup)) + elog(ERROR, "cache lookup failed for index %u", ind_oid); + d = SysCacheGetAttr(INDEXRELID, tup, Anum_pg_index_indclass, &isnull); + Assert(!isnull); + oidvec = (oidvector *) DatumGetPointer(d); + opclasses = (Oid *) palloc(oid_arr_size); + memcpy(opclasses, oidvec->values, oid_arr_size); + + d = SysCacheGetAttr(INDEXRELID, tup, Anum_pg_index_indoption, + &isnull); + Assert(!isnull); + int2vec = (int2vector *) DatumGetPointer(d); + indoptions = (int16 *) palloc(int2_arr_size); + memcpy(indoptions, int2vec->values, int2_arr_size); + ReleaseSysCache(tup); + + tup = SearchSysCache1(RELOID, ObjectIdGetDatum(ind_oid)); + if (!HeapTupleIsValid(tup)) + elog(ERROR, "cache lookup failed for index relation %u", ind_oid); + d = SysCacheGetAttr(RELOID, tup, Anum_pg_class_reloptions, &isnull); + reloptions = !isnull ? DatumGetTextPCopy(d) : NULL; + ReleaseSysCache(tup); + + opclassOptions = palloc0(sizeof(Datum) * ind_info->ii_NumIndexAttrs); + for (i = 0; i < ind_info->ii_NumIndexAttrs; i++) + opclassOptions[i] = get_attoptions(ind_oid, i + 1); + + stattargets = get_index_stattargets(ind_oid, ind_info); + + /* + * Neither parentIndexRelid nor parentConstraintId needs to be passed + * since the new catalog entries (pg_constraint, pg_inherits) would + * eventually be dropped. Therefore there's no need to record valid + * dependency on parents. + */ + ind_oid_new = index_create(NewHeap, + ind_name->data, + InvalidOid, + InvalidOid, /* parentIndexRelid */ + InvalidOid, /* parentConstraintId */ + InvalidOid, + ind_info, + colnames, + ind->rd_rel->relam, + tbsp_oid, + collations, + opclasses, + opclassOptions, + indoptions, + stattargets, + PointerGetDatum(reloptions), + flags, /* flags */ + 0, /* constr_flags */ + false, /* allow_system_table_mods */ + false, /* is_internal */ + NULL /* constraintId */ + ); + result = lappend_oid(result, ind_oid_new); + + index_close(ind, AccessShareLock); + list_free_deep(colnames); + pfree(collations); + pfree(opclasses); + pfree(indoptions); + if (reloptions) + pfree(reloptions); + } + + return result; +} + +/* + * REPACK is intended to be a replacement of both CLUSTER and VACUUM FULL. + */ +void +repack(ParseState *pstate, RepackStmt *stmt, bool isTopLevel) +{ + ListCell *lc; + ClusterParams params = {0}; + bool verbose = false; + Relation rel = NULL; + Oid indexOid = InvalidOid; + MemoryContext repack_context; + List *rtcs; + LOCKMODE lockmode; + + /* Parse option list */ + foreach(lc, stmt->params) + { + DefElem *opt = (DefElem *) lfirst(lc); + + if (strcmp(opt->defname, "verbose") == 0) + verbose = defGetBoolean(opt); + else + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("unrecognized REPACK option \"%s\"", + opt->defname), + parser_errposition(pstate, opt->location))); + } + + params.options = + (verbose ? CLUOPT_VERBOSE : 0) | + (stmt->concurrent ? CLUOPT_CONCURRENT : 0); + + /* + * Determine the lock mode expected by cluster_rel(). + * + * In the exclusive case, we obtain AccessExclusiveLock right away to + * avoid lock-upgrade hazard in the single-transaction case. In the + * CONCURRENTLY case, the AccessExclusiveLock will only be used at the end + * of processing, supposedly for very short time. Until then, we'll have + * to unlock the relation temporarily, so there's no lock-upgrade hazard. + */ + lockmode = (params.options & CLUOPT_CONCURRENT) == 0 ? + AccessExclusiveLock : ShareUpdateExclusiveLock; + + if (stmt->relation != NULL) + { + rel = process_single_relation(stmt->relation, stmt->indexname, + CLUSTER_COMMAND_REPACK, lockmode, + isTopLevel, ¶ms, &indexOid); + if (rel == NULL) + return; + } + + /* + * By here, we know we are in a multi-table situation. + * + * Concurrent processing is currently considered rather special (e.g. in + * terms of resources consumed) so it is not performed in bulk. + */ + if (params.options & CLUOPT_CONCURRENT) + { + if (rel != NULL) + { + Assert(rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE); + ereport(ERROR, + (errmsg("REPACK CONCURRENTLY not supported for partitioned tables"), + errhint("Consider running the command for individual partitions."))); + } + else + ereport(ERROR, + (errmsg("REPACK CONCURRENTLY requires explicit table name"))); + } + + /* + * In order to avoid holding locks for too long, we want to process each + * table in its own transaction. This forces us to disallow running + * inside a user transaction block. + */ + PreventInTransactionBlock(isTopLevel, "REPACK"); + + /* Also, we need a memory context to hold our list of relations */ + repack_context = AllocSetContextCreate(PortalContext, + "Repack", + ALLOCSET_DEFAULT_SIZES); + + params.options |= CLUOPT_RECHECK; + if (rel != NULL) + { + Oid relid; + bool rel_is_index; + + Assert(rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE); + /* See the ereport() above. */ + Assert((params.options & CLUOPT_CONCURRENT) == 0); + + if (OidIsValid(indexOid)) + { + relid = indexOid; + rel_is_index = true; + } + else + { + relid = RelationGetRelid(rel); + rel_is_index = false; + } + rtcs = get_tables_to_cluster_partitioned(repack_context, relid, + rel_is_index, + CLUSTER_COMMAND_REPACK); + + /* close relation, releasing lock on parent table */ + table_close(rel, lockmode); + } + else + rtcs = get_tables_to_repack(repack_context); + + /* Do the job. */ + cluster_multiple_rels(rtcs, ¶ms, CLUSTER_COMMAND_REPACK, lockmode, + isTopLevel); - /* Do the job. */ - cluster_multiple_rels(rtcs, ¶ms, CLUSTER_COMMAND_REPACK); /* Start a new transaction for the cleanup work. */ StartTransactionCommand(); @@ -1925,7 +3424,8 @@ repack(ParseState *pstate, RepackStmt *stmt, bool isTopLevel) */ static Relation process_single_relation(RangeVar *relation, char *indexname, - ClusterCommand cmd, ClusterParams *params, + ClusterCommand cmd, LOCKMODE lockmode, + bool isTopLevel, ClusterParams *params, Oid *indexOid_p) { Relation rel; @@ -1935,12 +3435,10 @@ process_single_relation(RangeVar *relation, char *indexname, Oid tableOid; /* - * Find, lock, and check permissions on the table. We obtain - * AccessExclusiveLock right away to avoid lock-upgrade hazard in the - * single-transaction case. + * Find, lock, and check permissions on the table. */ tableOid = RangeVarGetRelidExtended(relation, - AccessExclusiveLock, + lockmode, 0, RangeVarCallbackMaintainsTable, NULL); @@ -1994,7 +3492,7 @@ process_single_relation(RangeVar *relation, char *indexname, /* For non-partitioned tables, do what we came here to do. */ if (rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE) { - cluster_rel(rel, indexOid, params, cmd); + cluster_rel(rel, indexOid, params, cmd, isTopLevel); /* cluster_rel closes the relation, but keeps lock */ return NULL; diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c index e7854add178..df879c2a18d 100644 --- a/src/backend/commands/matview.c +++ b/src/backend/commands/matview.c @@ -904,7 +904,7 @@ refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner, static void refresh_by_heap_swap(Oid matviewOid, Oid OIDNewHeap, char relpersistence) { - finish_heap_swap(matviewOid, OIDNewHeap, false, false, true, true, + finish_heap_swap(matviewOid, OIDNewHeap, false, false, true, true, true, RecentXmin, ReadNextMultiXactId(), relpersistence); } diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index b7a74f25785..2b15e5b1505 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -5970,6 +5970,7 @@ ATRewriteTables(AlterTableStmt *parsetree, List **wqueue, LOCKMODE lockmode, finish_heap_swap(tab->relid, OIDNewHeap, false, false, true, !OidIsValid(tab->newTableSpace), + true, RecentXmin, ReadNextMultiXactId(), persistence); diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c index a4ad23448f8..f9f8f5ebb58 100644 --- a/src/backend/commands/vacuum.c +++ b/src/backend/commands/vacuum.c @@ -124,7 +124,7 @@ static void vac_truncate_clog(TransactionId frozenXID, TransactionId lastSaneFrozenXid, MultiXactId lastSaneMinMulti); static bool vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params, - BufferAccessStrategy bstrategy); + BufferAccessStrategy bstrategy, bool isTopLevel); static double compute_parallel_delay(void); static VacOptValue get_vacoptval_from_boolean(DefElem *def); static bool vac_tid_reaped(ItemPointer itemptr, void *state); @@ -634,7 +634,8 @@ vacuum(List *relations, VacuumParams *params, BufferAccessStrategy bstrategy, if (params->options & VACOPT_VACUUM) { - if (!vacuum_rel(vrel->oid, vrel->relation, params, bstrategy)) + if (!vacuum_rel(vrel->oid, vrel->relation, params, bstrategy, + isTopLevel)) continue; } @@ -1996,7 +1997,7 @@ vac_truncate_clog(TransactionId frozenXID, */ static bool vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params, - BufferAccessStrategy bstrategy) + BufferAccessStrategy bstrategy, bool isTopLevel) { LOCKMODE lmode; Relation rel; @@ -2264,7 +2265,7 @@ vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params, /* VACUUM FULL is now a variant of CLUSTER; see cluster.c */ cluster_rel(rel, InvalidOid, &cluster_params, - CLUSTER_COMMAND_VACUUM); + CLUSTER_COMMAND_VACUUM, isTopLevel); /* cluster_rel closes the relation, but keeps lock */ rel = NULL; @@ -2310,7 +2311,8 @@ vacuum_rel(Oid relid, RangeVar *relation, VacuumParams *params, toast_vacuum_params.options |= VACOPT_PROCESS_MAIN; toast_vacuum_params.toast_parent = relid; - vacuum_rel(toast_relid, NULL, &toast_vacuum_params, bstrategy); + vacuum_rel(toast_relid, NULL, &toast_vacuum_params, bstrategy, + isTopLevel); } /* diff --git a/src/backend/meson.build b/src/backend/meson.build index 2b0db214804..50aa385a581 100644 --- a/src/backend/meson.build +++ b/src/backend/meson.build @@ -194,5 +194,6 @@ pg_test_mod_args = pg_mod_args + { subdir('jit/llvm') subdir('replication/libpqwalreceiver') subdir('replication/pgoutput') +subdir('replication/pgoutput_repack') subdir('snowball') subdir('utils/mb/conversion_procs') diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 9c79265a438..634d0768851 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -11892,27 +11892,30 @@ cluster_index_specification: * * QUERY: * REPACK [ (options) ] [ [ USING INDEX ] ] + * REPACK [ (options) ] CONCURRENTLY [ USING INDEX ] * *****************************************************************************/ RepackStmt: - REPACK qualified_name repack_index_specification + REPACK opt_concurrently qualified_name repack_index_specification { RepackStmt *n = makeNode(RepackStmt); - n->relation = $2; - n->indexname = $3; + n->concurrent = $2; + n->relation = $3; + n->indexname = $4; n->params = NIL; $$ = (Node *) n; } - | REPACK '(' utility_option_list ')' qualified_name repack_index_specification + | REPACK '(' utility_option_list ')' opt_concurrently qualified_name repack_index_specification { RepackStmt *n = makeNode(RepackStmt); - n->relation = $5; - n->indexname = $6; n->params = $3; + n->concurrent = $5; + n->relation = $6; + n->indexname = $7; $$ = (Node *) n; } @@ -11923,6 +11926,7 @@ RepackStmt: n->relation = NULL; n->indexname = NULL; n->params = NIL; + n->concurrent = false; $$ = (Node *) n; } @@ -11933,6 +11937,7 @@ RepackStmt: n->relation = NULL; n->indexname = NULL; n->params = $3; + n->concurrent = false; $$ = (Node *) n; } ; diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 78f9a0a11c4..00f7bbc5f59 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -33,6 +33,7 @@ #include "access/xlogreader.h" #include "access/xlogrecord.h" #include "catalog/pg_control.h" +#include "commands/cluster.h" #include "replication/decode.h" #include "replication/logical.h" #include "replication/message.h" @@ -467,6 +468,29 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) TransactionId xid = XLogRecGetXid(buf->record); SnapBuild *builder = ctx->snapshot_builder; + /* + * Check if REPACK CONCURRENTLY is being performed by this backend. If so, + * only decode data changes of the table that it is processing, and the + * changes of its TOAST relation. + * + * (TOAST locator should not be set unless the main is.) + */ + Assert(!OidIsValid(repacked_rel_toast_locator.relNumber) || + OidIsValid(repacked_rel_locator.relNumber)); + + if (OidIsValid(repacked_rel_locator.relNumber)) + { + XLogReaderState *r = buf->record; + RelFileLocator locator; + + /* Not all records contain the block. */ + if (XLogRecGetBlockTagExtended(r, 0, &locator, NULL, NULL, NULL) && + !RelFileLocatorEquals(locator, repacked_rel_locator) && + (!OidIsValid(repacked_rel_toast_locator.relNumber) || + !RelFileLocatorEquals(locator, repacked_rel_toast_locator))) + return; + } + ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr); /* diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index e5d2a583ce6..c32e459411b 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -486,6 +486,26 @@ SnapBuildInitialSnapshot(SnapBuild *builder) return SnapBuildMVCCFromHistoric(snap, true); } +/* + * Build an MVCC snapshot for the initial data load performed by REPACK + * CONCURRENTLY command. + * + * The snapshot will only be used to scan one particular relation, which is + * treated like a catalog (therefore ->building_full_snapshot is not + * important), and the caller should already have a replication slot setup (so + * we do not set MyProc->xmin). XXX Do we yet need to add some restrictions? + */ +Snapshot +SnapBuildInitialSnapshotForRepack(SnapBuild *builder) +{ + Snapshot snap; + + Assert(builder->state == SNAPBUILD_CONSISTENT); + + snap = SnapBuildBuildSnapshot(builder); + return SnapBuildMVCCFromHistoric(snap, false); +} + /* * Turn a historic MVCC snapshot into an ordinary MVCC snapshot. * diff --git a/src/backend/replication/pgoutput_repack/Makefile b/src/backend/replication/pgoutput_repack/Makefile new file mode 100644 index 00000000000..4efeb713b70 --- /dev/null +++ b/src/backend/replication/pgoutput_repack/Makefile @@ -0,0 +1,32 @@ +#------------------------------------------------------------------------- +# +# Makefile-- +# Makefile for src/backend/replication/pgoutput_repack +# +# IDENTIFICATION +# src/backend/replication/pgoutput_repack +# +#------------------------------------------------------------------------- + +subdir = src/backend/replication/pgoutput_repack +top_builddir = ../../../.. +include $(top_builddir)/src/Makefile.global + +OBJS = \ + $(WIN32RES) \ + pgoutput_repack.o +PGFILEDESC = "pgoutput_repack - logical replication output plugin for REPACK command" +NAME = pgoutput_repack + +all: all-shared-lib + +include $(top_srcdir)/src/Makefile.shlib + +install: all installdirs install-lib + +installdirs: installdirs-lib + +uninstall: uninstall-lib + +clean distclean: clean-lib + rm -f $(OBJS) diff --git a/src/backend/replication/pgoutput_repack/meson.build b/src/backend/replication/pgoutput_repack/meson.build new file mode 100644 index 00000000000..133e865a4a0 --- /dev/null +++ b/src/backend/replication/pgoutput_repack/meson.build @@ -0,0 +1,18 @@ +# Copyright (c) 2022-2024, PostgreSQL Global Development Group + +pgoutput_repack_sources = files( + 'pgoutput_repack.c', +) + +if host_system == 'windows' + pgoutput_repack_sources += rc_lib_gen.process(win32ver_rc, extra_args: [ + '--NAME', 'pgoutput_repack', + '--FILEDESC', 'pgoutput_repack - logical replication output plugin for REPACK command',]) +endif + +pgoutput_repack = shared_module('pgoutput_repack', + pgoutput_repack_sources, + kwargs: pg_mod_args, +) + +backend_targets += pgoutput_repack diff --git a/src/backend/replication/pgoutput_repack/pgoutput_repack.c b/src/backend/replication/pgoutput_repack/pgoutput_repack.c new file mode 100644 index 00000000000..687fbbc59bb --- /dev/null +++ b/src/backend/replication/pgoutput_repack/pgoutput_repack.c @@ -0,0 +1,288 @@ +/*------------------------------------------------------------------------- + * + * pgoutput_cluster.c + * Logical Replication output plugin for REPACK command + * + * Copyright (c) 2012-2024, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/pgoutput_cluster/pgoutput_cluster.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/heaptoast.h" +#include "commands/cluster.h" +#include "replication/snapbuild.h" + +PG_MODULE_MAGIC; + +static void plugin_startup(LogicalDecodingContext *ctx, + OutputPluginOptions *opt, bool is_init); +static void plugin_shutdown(LogicalDecodingContext *ctx); +static void plugin_begin_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn); +static void plugin_commit_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr commit_lsn); +static void plugin_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + Relation rel, ReorderBufferChange *change); +static void plugin_truncate(struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, int nrelations, + Relation relations[], + ReorderBufferChange *change); +static void store_change(LogicalDecodingContext *ctx, + ConcurrentChangeKind kind, HeapTuple tuple); + +void +_PG_output_plugin_init(OutputPluginCallbacks *cb) +{ + AssertVariableIsOfType(&_PG_output_plugin_init, LogicalOutputPluginInit); + + cb->startup_cb = plugin_startup; + cb->begin_cb = plugin_begin_txn; + cb->change_cb = plugin_change; + cb->truncate_cb = plugin_truncate; + cb->commit_cb = plugin_commit_txn; + cb->shutdown_cb = plugin_shutdown; +} + + +/* initialize this plugin */ +static void +plugin_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, + bool is_init) +{ + ctx->output_plugin_private = NULL; + + /* Probably unnecessary, as we don't use the SQL interface ... */ + opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT; + + if (ctx->output_plugin_options != NIL) + { + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("This plugin does not expect any options"))); + } +} + +static void +plugin_shutdown(LogicalDecodingContext *ctx) +{ +} + +/* + * As we don't release the slot during processing of particular table, there's + * no room for SQL interface, even for debugging purposes. Therefore we need + * neither OutputPluginPrepareWrite() nor OutputPluginWrite() in the plugin + * callbacks. (Although we might want to write custom callbacks, this API + * seems to be unnecessarily generic for our purposes.) + */ + +/* BEGIN callback */ +static void +plugin_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) +{ +} + +/* COMMIT callback */ +static void +plugin_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn) +{ +} + +/* + * Callback for individual changed tuples + */ +static void +plugin_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + Relation relation, ReorderBufferChange *change) +{ + RepackDecodingState *dstate; + + dstate = (RepackDecodingState *) ctx->output_writer_private; + + /* Only interested in one particular relation. */ + if (relation->rd_id != dstate->relid) + return; + + /* Decode entry depending on its type */ + switch (change->action) + { + case REORDER_BUFFER_CHANGE_INSERT: + { + HeapTuple newtuple; + + newtuple = change->data.tp.newtuple != NULL ? + change->data.tp.newtuple : NULL; + + /* + * Identity checks in the main function should have made this + * impossible. + */ + if (newtuple == NULL) + elog(ERROR, "Incomplete insert info."); + + store_change(ctx, CHANGE_INSERT, newtuple); + } + break; + case REORDER_BUFFER_CHANGE_UPDATE: + { + HeapTuple oldtuple, + newtuple; + + oldtuple = change->data.tp.oldtuple != NULL ? + change->data.tp.oldtuple : NULL; + newtuple = change->data.tp.newtuple != NULL ? + change->data.tp.newtuple : NULL; + + if (newtuple == NULL) + elog(ERROR, "Incomplete update info."); + + if (oldtuple != NULL) + store_change(ctx, CHANGE_UPDATE_OLD, oldtuple); + + store_change(ctx, CHANGE_UPDATE_NEW, newtuple); + } + break; + case REORDER_BUFFER_CHANGE_DELETE: + { + HeapTuple oldtuple; + + oldtuple = change->data.tp.oldtuple ? + change->data.tp.oldtuple : NULL; + + if (oldtuple == NULL) + elog(ERROR, "Incomplete delete info."); + + store_change(ctx, CHANGE_DELETE, oldtuple); + } + break; + default: + /* Should not come here */ + Assert(false); + break; + } +} + +static void +plugin_truncate(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + int nrelations, Relation relations[], + ReorderBufferChange *change) +{ + RepackDecodingState *dstate; + int i; + Relation relation = NULL; + + dstate = (RepackDecodingState *) ctx->output_writer_private; + + /* Find the relation we are processing. */ + for (i = 0; i < nrelations; i++) + { + relation = relations[i]; + + if (RelationGetRelid(relation) == dstate->relid) + break; + } + + /* Is this truncation of another relation? */ + if (i == nrelations) + return; + + store_change(ctx, CHANGE_TRUNCATE, NULL); +} + +/* Store concurrent data change. */ +static void +store_change(LogicalDecodingContext *ctx, ConcurrentChangeKind kind, + HeapTuple tuple) +{ + RepackDecodingState *dstate; + char *change_raw; + ConcurrentChange change; + bool flattened = false; + Size size; + Datum values[1]; + bool isnull[1]; + char *dst, + *dst_start; + + dstate = (RepackDecodingState *) ctx->output_writer_private; + + size = MAXALIGN(VARHDRSZ) + SizeOfConcurrentChange; + + if (tuple) + { + /* + * ReorderBufferCommit() stores the TOAST chunks in its private memory + * context and frees them after having called apply_change(). + * Therefore we need flat copy (including TOAST) that we eventually + * copy into the memory context which is available to + * decode_concurrent_changes(). + */ + if (HeapTupleHasExternal(tuple)) + { + /* + * toast_flatten_tuple_to_datum() might be more convenient but we + * don't want the decompression it does. + */ + tuple = toast_flatten_tuple(tuple, dstate->tupdesc); + flattened = true; + } + + size += tuple->t_len; + } + + /* XXX Isn't there any function / macro to do this? */ + if (size >= 0x3FFFFFFF) + elog(ERROR, "Change is too big."); + + /* Construct the change. */ + change_raw = (char *) palloc0(size); + SET_VARSIZE(change_raw, size); + + /* + * Since the varlena alignment might not be sufficient for the structure, + * set the fields in a local instance and remember where it should + * eventually be copied. + */ + change.kind = kind; + dst_start = (char *) VARDATA(change_raw); + + /* No other information is needed for TRUNCATE. */ + if (change.kind == CHANGE_TRUNCATE) + { + memcpy(dst_start, &change, SizeOfConcurrentChange); + goto store; + } + + /* + * Copy the tuple. + * + * CAUTION: change->tup_data.t_data must be fixed on retrieval! + */ + memcpy(&change.tup_data, tuple, sizeof(HeapTupleData)); + dst = dst_start + SizeOfConcurrentChange; + memcpy(dst, tuple->t_data, tuple->t_len); + + /* The data has been copied. */ + if (flattened) + pfree(tuple); + +store: + /* Copy the structure so it can be stored. */ + memcpy(dst_start, &change, SizeOfConcurrentChange); + + /* Store as tuple of 1 bytea column. */ + values[0] = PointerGetDatum(change_raw); + isnull[0] = false; + tuplestore_putvalues(dstate->tstore, dstate->tupdesc_change, + values, isnull); + + /* Accounting. */ + dstate->nchanges++; + + /* Cleanup. */ + pfree(change_raw); +} diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index 2fa045e6b0f..e9ddf39500c 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -25,6 +25,7 @@ #include "access/xlogprefetcher.h" #include "access/xlogrecovery.h" #include "commands/async.h" +#include "commands/cluster.h" #include "miscadmin.h" #include "pgstat.h" #include "postmaster/autovacuum.h" diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index 4f44648aca8..1ee069c34ee 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -351,6 +351,7 @@ DSMRegistry "Waiting to read or update the dynamic shared memory registry." InjectionPoint "Waiting to read or update information related to injection points." SerialControl "Waiting to read or update shared pg_serial state." AioWorkerSubmissionQueue "Waiting to access AIO worker submission queue." +RepackedRels "Waiting to read or update information on tables being repacked concurrently." # # END OF PREDEFINED LWLOCKS (DO NOT CHANGE THIS LINE) diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c index 9f54a9e72b7..a495f22876d 100644 --- a/src/backend/utils/cache/relcache.c +++ b/src/backend/utils/cache/relcache.c @@ -64,6 +64,7 @@ #include "catalog/pg_type.h" #include "catalog/schemapg.h" #include "catalog/storage.h" +#include "commands/cluster.h" #include "commands/policy.h" #include "commands/publicationcmds.h" #include "commands/trigger.h" diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c index 70a6b8902d1..7f1c220e00b 100644 --- a/src/backend/utils/time/snapmgr.c +++ b/src/backend/utils/time/snapmgr.c @@ -213,7 +213,6 @@ static List *exportedSnapshots = NIL; /* Prototypes for local functions */ static void UnregisterSnapshotNoOwner(Snapshot snapshot); -static void FreeSnapshot(Snapshot snapshot); static void SnapshotResetXmin(void); /* ResourceOwner callbacks to track snapshot references */ @@ -646,7 +645,7 @@ CopySnapshot(Snapshot snapshot) * FreeSnapshot * Free the memory associated with a snapshot. */ -static void +void FreeSnapshot(Snapshot snapshot) { Assert(snapshot->regd_count == 0); diff --git a/src/bin/psql/tab-complete.in.c b/src/bin/psql/tab-complete.in.c index 31271786f21..a22e6cb6ccc 100644 --- a/src/bin/psql/tab-complete.in.c +++ b/src/bin/psql/tab-complete.in.c @@ -4914,18 +4914,27 @@ match_previous_words(int pattern_id, } /* REPACK */ - else if (Matches("REPACK")) + else if (Matches("REPACK") || Matches("REPACK", "(*)")) + COMPLETE_WITH_SCHEMA_QUERY_PLUS(Query_for_list_of_clusterables, + "CONCURRENTLY"); + else if (Matches("REPACK", "CONCURRENTLY")) COMPLETE_WITH_SCHEMA_QUERY(Query_for_list_of_clusterables); - else if (Matches("REPACK", "(*)")) + else if (Matches("REPACK", "(*)", "CONCURRENTLY")) COMPLETE_WITH_SCHEMA_QUERY(Query_for_list_of_clusterables); - /* If we have REPACK , then add "USING INDEX" */ - else if (Matches("REPACK", MatchAnyExcept("("))) + /* If we have REPACK [ CONCURRENTLY ] , then add "USING INDEX" */ + else if (Matches("REPACK", MatchAnyExcept("(|CONCURRENTLY")) || + Matches("REPACK", "CONCURRENTLY", MatchAnyExcept("("))) COMPLETE_WITH("USING INDEX"); - /* If we have REPACK (*) , then add "USING INDEX" */ - else if (Matches("REPACK", "(*)", MatchAny)) + /* If we have REPACK (*) [ CONCURRENTLY ] , then add "USING INDEX" */ + else if (Matches("REPACK", "(*)", MatchAnyExcept("CONCURRENTLY")) || + Matches("REPACK", "(*)", "CONCURRENTLY", MatchAnyExcept("("))) COMPLETE_WITH("USING INDEX"); - /* If we have REPACK USING, then add the index as well */ - else if (Matches("REPACK", MatchAny, "USING", "INDEX")) + + /* + * Complete ... [ (*) ] [ CONCURRENTLY ] USING INDEX, with a list of + * indexes for . + */ + else if (TailMatches(MatchAnyExcept("(|CONCURRENTLY"), "USING", "INDEX")) { set_completion_reference(prev3_wd); COMPLETE_WITH_SCHEMA_QUERY(Query_for_index_of_table); diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index 1640d9c32f7..bdeb2f83540 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -421,6 +421,10 @@ extern HTSV_Result HeapTupleSatisfiesVacuumHorizon(HeapTuple htup, Buffer buffer TransactionId *dead_after); extern void HeapTupleSetHintBits(HeapTupleHeader tuple, Buffer buffer, uint16 infomask, TransactionId xid); +extern bool HeapTupleMVCCInserted(HeapTuple htup, Snapshot snapshot, + Buffer buffer); +extern bool HeapTupleMVCCNotDeleted(HeapTuple htup, Snapshot snapshot, + Buffer buffer); extern bool HeapTupleHeaderIsOnlyLocked(HeapTupleHeader tuple); extern bool HeapTupleIsSurelyDead(HeapTuple htup, struct GlobalVisState *vistest); diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h index b8cb1e744ad..b1ca73d6ea5 100644 --- a/src/include/access/tableam.h +++ b/src/include/access/tableam.h @@ -21,6 +21,7 @@ #include "access/sdir.h" #include "access/xact.h" #include "executor/tuptable.h" +#include "replication/logical.h" #include "storage/read_stream.h" #include "utils/rel.h" #include "utils/snapshot.h" @@ -630,6 +631,8 @@ typedef struct TableAmRoutine Relation OldIndex, bool use_sort, TransactionId OldestXmin, + Snapshot snapshot, + LogicalDecodingContext *decoding_ctx, TransactionId *xid_cutoff, MultiXactId *multi_cutoff, double *num_tuples, @@ -1637,6 +1640,10 @@ table_relation_copy_data(Relation rel, const RelFileLocator *newrlocator) * not needed for the relation's AM * - *xid_cutoff - ditto * - *multi_cutoff - ditto + * - snapshot - if != NULL, ignore data changes done by transactions that this + * (MVCC) snapshot considers still in-progress or in the future. + * - decoding_ctx - logical decoding context, to capture concurrent data + * changes. * * Output parameters: * - *xid_cutoff - rel's new relfrozenxid value, may be invalid @@ -1649,6 +1656,8 @@ table_relation_copy_for_cluster(Relation OldTable, Relation NewTable, Relation OldIndex, bool use_sort, TransactionId OldestXmin, + Snapshot snapshot, + LogicalDecodingContext *decoding_ctx, TransactionId *xid_cutoff, MultiXactId *multi_cutoff, double *num_tuples, @@ -1657,6 +1666,7 @@ table_relation_copy_for_cluster(Relation OldTable, Relation NewTable, { OldTable->rd_tableam->relation_copy_for_cluster(OldTable, NewTable, OldIndex, use_sort, OldestXmin, + snapshot, decoding_ctx, xid_cutoff, multi_cutoff, num_tuples, tups_vacuumed, tups_recently_dead); diff --git a/src/include/catalog/index.h b/src/include/catalog/index.h index 4daa8bef5ee..66431cc19e5 100644 --- a/src/include/catalog/index.h +++ b/src/include/catalog/index.h @@ -100,6 +100,9 @@ extern Oid index_concurrently_create_copy(Relation heapRelation, Oid tablespaceOid, const char *newName); +extern NullableDatum *get_index_stattargets(Oid indexid, + IndexInfo *indInfo); + extern void index_concurrently_build(Oid heapRelationId, Oid indexRelationId); diff --git a/src/include/commands/cluster.h b/src/include/commands/cluster.h index c2976905e4d..569cc2184b3 100644 --- a/src/include/commands/cluster.h +++ b/src/include/commands/cluster.h @@ -13,10 +13,15 @@ #ifndef CLUSTER_H #define CLUSTER_H +#include "nodes/execnodes.h" #include "nodes/parsenodes.h" #include "parser/parse_node.h" +#include "replication/logical.h" #include "storage/lock.h" +#include "storage/relfilelocator.h" #include "utils/relcache.h" +#include "utils/resowner.h" +#include "utils/tuplestore.h" /* flag bits for ClusterParams->options */ @@ -24,6 +29,7 @@ #define CLUOPT_RECHECK 0x02 /* recheck relation state */ #define CLUOPT_RECHECK_ISCLUSTERED 0x04 /* recheck relation state for * indisclustered */ +#define CLUOPT_CONCURRENT 0x08 /* allow concurrent data changes */ /* options for CLUSTER */ typedef struct ClusterParams @@ -46,14 +52,90 @@ typedef enum ClusterCommand CLUSTER_COMMAND_VACUUM } ClusterCommand; +/* + * The following definitions are used by REPACK CONCURRENTLY. + */ + +extern RelFileLocator repacked_rel_locator; +extern RelFileLocator repacked_rel_toast_locator; + +typedef enum +{ + CHANGE_INSERT, + CHANGE_UPDATE_OLD, + CHANGE_UPDATE_NEW, + CHANGE_DELETE, + CHANGE_TRUNCATE +} ConcurrentChangeKind; + +typedef struct ConcurrentChange +{ + /* See the enum above. */ + ConcurrentChangeKind kind; + + /* + * The actual tuple. + * + * The tuple data follows the ConcurrentChange structure. Before use make + * sure the tuple is correctly aligned (ConcurrentChange can be stored as + * bytea) and that tuple->t_data is fixed. + */ + HeapTupleData tup_data; +} ConcurrentChange; + +#define SizeOfConcurrentChange (offsetof(ConcurrentChange, tup_data) + \ + sizeof(HeapTupleData)) + +/* + * Logical decoding state. + * + * Here we store the data changes that we decode from WAL while the table + * contents is being copied to a new storage. Also the necessary metadata + * needed to apply these changes to the table is stored here. + */ +typedef struct RepackDecodingState +{ + /* The relation whose changes we're decoding. */ + Oid relid; + + /* + * Decoded changes are stored here. Although we try to avoid excessive + * batches, it can happen that the changes need to be stored to disk. The + * tuplestore does this transparently. + */ + Tuplestorestate *tstore; + + /* The current number of changes in tstore. */ + double nchanges; + + /* + * Descriptor to store the ConcurrentChange structure serialized (bytea). + * We can't store the tuple directly because tuplestore only supports + * minimum tuple and we may need to transfer OID system column from the + * output plugin. Also we need to transfer the change kind, so it's better + * to put everything in the structure than to use 2 tuplestores "in + * parallel". + */ + TupleDesc tupdesc_change; + + /* Tuple descriptor needed to update indexes. */ + TupleDesc tupdesc; + + /* Slot to retrieve data from tstore. */ + TupleTableSlot *tsslot; + + ResourceOwner resowner; +} RepackDecodingState; + extern void cluster(ParseState *pstate, ClusterStmt *stmt, bool isTopLevel); extern void cluster_rel(Relation OldHeap, Oid indexOid, ClusterParams *params, - ClusterCommand cmd); + ClusterCommand cmd, bool isTopLevel); extern void check_index_is_clusterable(Relation OldHeap, Oid indexOid, LOCKMODE lockmode, ClusterCommand cmd); extern void mark_index_clustered(Relation rel, Oid indexOid, bool is_internal); - +extern void repack_decode_concurrent_changes(LogicalDecodingContext *ctx, + XLogRecPtr end_of_wal); extern Oid make_new_heap(Oid OIDOldHeap, Oid NewTableSpace, Oid NewAccessMethod, char relpersistence, LOCKMODE lockmode); extern void finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap, @@ -61,6 +143,7 @@ extern void finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap, bool swap_toast_by_content, bool check_constraints, bool is_internal, + bool reindex, TransactionId frozenXid, MultiXactId cutoffMulti, char newrelpersistence); diff --git a/src/include/commands/progress.h b/src/include/commands/progress.h index 7644267e14f..6b1b1a4c1a7 100644 --- a/src/include/commands/progress.h +++ b/src/include/commands/progress.h @@ -67,10 +67,12 @@ #define PROGRESS_REPACK_PHASE 1 #define PROGRESS_REPACK_INDEX_RELID 2 #define PROGRESS_REPACK_HEAP_TUPLES_SCANNED 3 -#define PROGRESS_REPACK_HEAP_TUPLES_WRITTEN 4 -#define PROGRESS_REPACK_TOTAL_HEAP_BLKS 5 -#define PROGRESS_REPACK_HEAP_BLKS_SCANNED 6 -#define PROGRESS_REPACK_INDEX_REBUILD_COUNT 7 +#define PROGRESS_REPACK_HEAP_TUPLES_INSERTED 4 +#define PROGRESS_REPACK_HEAP_TUPLES_UPDATED 5 +#define PROGRESS_REPACK_HEAP_TUPLES_DELETED 6 +#define PROGRESS_REPACK_TOTAL_HEAP_BLKS 7 +#define PROGRESS_REPACK_HEAP_BLKS_SCANNED 8 +#define PROGRESS_REPACK_INDEX_REBUILD_COUNT 9 /* * Phases of repack (as advertised via PROGRESS_REPACK_PHASE). @@ -83,9 +85,10 @@ #define PROGRESS_REPACK_PHASE_INDEX_SCAN_HEAP 2 #define PROGRESS_REPACK_PHASE_SORT_TUPLES 3 #define PROGRESS_REPACK_PHASE_WRITE_NEW_HEAP 4 -#define PROGRESS_REPACK_PHASE_SWAP_REL_FILES 5 -#define PROGRESS_REPACK_PHASE_REBUILD_INDEX 6 -#define PROGRESS_REPACK_PHASE_FINAL_CLEANUP 7 +#define PROGRESS_REPACK_PHASE_CATCH_UP 5 +#define PROGRESS_REPACK_PHASE_SWAP_REL_FILES 6 +#define PROGRESS_REPACK_PHASE_REBUILD_INDEX 8 +#define PROGRESS_REPACK_PHASE_FINAL_CLEANUP 8 /* Commands of PROGRESS_REPACK */ #define PROGRESS_REPACK_COMMAND_REPACK 1 diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 4ef76c852f5..de091ceb04a 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -3931,6 +3931,7 @@ typedef struct RepackStmt RangeVar *relation; /* relation being repacked */ char *indexname; /* order tuples by this index */ List *params; /* list of DefElem nodes */ + bool concurrent; /* allow concurrent access? */ } RepackStmt; diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h index 6d4d2d1814c..802fc4b0823 100644 --- a/src/include/replication/snapbuild.h +++ b/src/include/replication/snapbuild.h @@ -73,6 +73,7 @@ extern void FreeSnapshotBuilder(SnapBuild *builder); extern void SnapBuildSnapDecRefcount(Snapshot snap); extern Snapshot SnapBuildInitialSnapshot(SnapBuild *builder); +extern Snapshot SnapBuildInitialSnapshotForRepack(SnapBuild *builder); extern Snapshot SnapBuildMVCCFromHistoric(Snapshot snapshot, bool in_place); extern const char *SnapBuildExportSnapshot(SnapBuild *builder); extern void SnapBuildClearExportedSnapshot(void); diff --git a/src/include/storage/lockdefs.h b/src/include/storage/lockdefs.h index 7f3ba0352f6..2739327b0da 100644 --- a/src/include/storage/lockdefs.h +++ b/src/include/storage/lockdefs.h @@ -36,8 +36,8 @@ typedef int LOCKMODE; #define AccessShareLock 1 /* SELECT */ #define RowShareLock 2 /* SELECT FOR UPDATE/FOR SHARE */ #define RowExclusiveLock 3 /* INSERT, UPDATE, DELETE */ -#define ShareUpdateExclusiveLock 4 /* VACUUM (non-FULL), ANALYZE, CREATE - * INDEX CONCURRENTLY */ +#define ShareUpdateExclusiveLock 4 /* VACUUM (non-exclusive), ANALYZE, CREATE + * INDEX CONCURRENTLY, REPACK CONCURRENTLY */ #define ShareLock 5 /* CREATE INDEX (WITHOUT CONCURRENTLY) */ #define ShareRowExclusiveLock 6 /* like EXCLUSIVE MODE, but allows ROW * SHARE */ diff --git a/src/include/storage/lwlocklist.h b/src/include/storage/lwlocklist.h index 932024b1b0b..fe9d85e5f95 100644 --- a/src/include/storage/lwlocklist.h +++ b/src/include/storage/lwlocklist.h @@ -84,3 +84,4 @@ PG_LWLOCK(50, DSMRegistry) PG_LWLOCK(51, InjectionPoint) PG_LWLOCK(52, SerialControl) PG_LWLOCK(53, AioWorkerSubmissionQueue) +PG_LWLOCK(54, RepackedRels) diff --git a/src/include/utils/snapmgr.h b/src/include/utils/snapmgr.h index 147b190210a..5eeabdc6c4f 100644 --- a/src/include/utils/snapmgr.h +++ b/src/include/utils/snapmgr.h @@ -61,6 +61,8 @@ extern Snapshot GetLatestSnapshot(void); extern void SnapshotSetCommandId(CommandId curcid); extern Snapshot CopySnapshot(Snapshot snapshot); +extern void FreeSnapshot(Snapshot snapshot); + extern Snapshot GetCatalogSnapshot(Oid relid); extern Snapshot GetNonHistoricCatalogSnapshot(Oid relid); extern void InvalidateCatalogSnapshot(void); diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 84ca2dc3778..086c61f4ef4 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1969,17 +1969,17 @@ pg_stat_progress_cluster| SELECT s.pid, WHEN 2 THEN 'index scanning heap'::text WHEN 3 THEN 'sorting tuples'::text WHEN 4 THEN 'writing new heap'::text - WHEN 5 THEN 'swapping relation files'::text - WHEN 6 THEN 'rebuilding index'::text - WHEN 7 THEN 'performing final cleanup'::text + WHEN 6 THEN 'swapping relation files'::text + WHEN 7 THEN 'rebuilding index'::text + WHEN 8 THEN 'performing final cleanup'::text ELSE NULL::text END AS phase, (s.param3)::oid AS cluster_index_relid, s.param4 AS heap_tuples_scanned, s.param5 AS heap_tuples_written, - s.param6 AS heap_blks_total, - s.param7 AS heap_blks_scanned, - s.param8 AS index_rebuild_count + s.param8 AS heap_blks_total, + s.param9 AS heap_blks_scanned, + s.param10 AS index_rebuild_count FROM (pg_stat_get_progress_info('CLUSTER'::text) s(pid, datid, relid, param1, param2, param3, param4, param5, param6, param7, param8, param9, param10, param11, param12, param13, param14, param15, param16, param17, param18, param19, param20) LEFT JOIN pg_database d ON ((s.datid = d.oid))); pg_stat_progress_copy| SELECT s.pid, @@ -2055,17 +2055,20 @@ pg_stat_progress_repack| SELECT s.pid, WHEN 2 THEN 'index scanning heap'::text WHEN 3 THEN 'sorting tuples'::text WHEN 4 THEN 'writing new heap'::text - WHEN 5 THEN 'swapping relation files'::text - WHEN 6 THEN 'rebuilding index'::text - WHEN 7 THEN 'performing final cleanup'::text + WHEN 5 THEN 'catch-up'::text + WHEN 6 THEN 'swapping relation files'::text + WHEN 7 THEN 'rebuilding index'::text + WHEN 8 THEN 'performing final cleanup'::text ELSE NULL::text END AS phase, (s.param3)::oid AS repack_index_relid, s.param4 AS heap_tuples_scanned, - s.param5 AS heap_tuples_written, - s.param6 AS heap_blks_total, - s.param7 AS heap_blks_scanned, - s.param8 AS index_rebuild_count + s.param5 AS heap_tuples_inserted, + s.param6 AS heap_tuples_updated, + s.param7 AS heap_tuples_deleted, + s.param8 AS heap_blks_total, + s.param9 AS heap_blks_scanned, + s.param10 AS index_rebuild_count FROM (pg_stat_get_progress_info('REPACK'::text) s(pid, datid, relid, param1, param2, param3, param4, param5, param6, param7, param8, param9, param10, param11, param12, param13, param14, param15, param16, param17, param18, param19, param20) LEFT JOIN pg_database d ON ((s.datid = d.oid))); pg_stat_progress_vacuum| SELECT s.pid, diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index c7ea8fb93ca..e89db0a2ee7 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -477,6 +477,8 @@ CompressFileHandle CompressionLocation CompressorState ComputeXidHorizonsResult +ConcurrentChange +ConcurrentChangeKind ConditionVariable ConditionVariableMinimallyPadded ConditionalStack @@ -1239,6 +1241,7 @@ IndexElem IndexFetchHeapData IndexFetchTableData IndexInfo +IndexInsertState IndexList IndexOnlyScan IndexOnlyScanState @@ -2507,6 +2510,7 @@ ReorderBufferTupleCidKey ReorderBufferUpdateProgressTxnCB ReorderTuple RepOriginId +RepackDecodingState RepackStmt ReparameterizeForeignPathByChild_function ReplaceVarsFromTargetList_context -- 2.43.5