From 75ff88b58a1b98429a253f4f45efc28c529dc6e7 Mon Sep 17 00:00:00 2001 From: Nathan Bossart Date: Fri, 28 Jun 2024 21:09:33 -0500 Subject: [PATCH v12 03/11] Use pg_upgrade's new parallel framework to get relation info. Reviewed-by: FIXME Discussion: https://postgr.es/m/20240516211638.GA1688936%40nathanxps13 --- src/bin/pg_upgrade/info.c | 296 ++++++++++++++++++++------------------ 1 file changed, 154 insertions(+), 142 deletions(-) diff --git a/src/bin/pg_upgrade/info.c b/src/bin/pg_upgrade/info.c index d3c1e8918d..2bfc8dcfba 100644 --- a/src/bin/pg_upgrade/info.c +++ b/src/bin/pg_upgrade/info.c @@ -11,6 +11,7 @@ #include "access/transam.h" #include "catalog/pg_class_d.h" +#include "pqexpbuffer.h" #include "pg_upgrade.h" static void create_rel_filename_map(const char *old_data, const char *new_data, @@ -22,12 +23,14 @@ static void report_unmatched_relation(const RelInfo *rel, const DbInfo *db, static void free_db_and_rel_infos(DbInfoArr *db_arr); static void get_template0_info(ClusterInfo *cluster); static void get_db_infos(ClusterInfo *cluster); -static void get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo); +static char *get_rel_infos_query(void); +static void process_rel_infos(DbInfo *dbinfo, PGresult *res, void *arg); static void free_rel_infos(RelInfoArr *rel_arr); static void print_db_infos(DbInfoArr *db_arr); static void print_rel_infos(RelInfoArr *rel_arr); static void print_slot_infos(LogicalSlotInfoArr *slot_arr); -static void get_old_cluster_logical_slot_infos(DbInfo *dbinfo); +static char *get_old_cluster_logical_slot_infos_query(void); +static void process_old_cluster_logical_slot_infos(DbInfo *dbinfo, PGresult *res, void *arg); /* @@ -276,7 +279,9 @@ report_unmatched_relation(const RelInfo *rel, const DbInfo *db, bool is_new_db) void get_db_rel_and_slot_infos(ClusterInfo *cluster) { - int dbnum; + UpgradeTask *task = upgrade_task_create(); + char *rel_infos_query = NULL; + char *logical_slot_infos_query = NULL; if (cluster->dbarr.dbs != NULL) free_db_and_rel_infos(&cluster->dbarr); @@ -284,15 +289,37 @@ get_db_rel_and_slot_infos(ClusterInfo *cluster) get_template0_info(cluster); get_db_infos(cluster); - for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++) + rel_infos_query = get_rel_infos_query(); + upgrade_task_add_step(task, + rel_infos_query, + process_rel_infos, + true, NULL); + + /* + * Logical slots are only carried over to the new cluster when the old + * cluster is on PG17 or newer. This is because before that the logical + * slots are not saved at shutdown, so there is no guarantee that the + * latest confirmed_flush_lsn is saved to disk which can lead to data + * loss. It is still not guaranteed for manually created slots in PG17, so + * subsequent checks done in check_old_cluster_for_valid_slots() would + * raise a FATAL error if such slots are included. + */ + if (cluster == &old_cluster && + GET_MAJOR_VERSION(cluster->major_version) > 1600) { - DbInfo *pDbInfo = &cluster->dbarr.dbs[dbnum]; + logical_slot_infos_query = get_old_cluster_logical_slot_infos_query(); + upgrade_task_add_step(task, + logical_slot_infos_query, + process_old_cluster_logical_slot_infos, + true, NULL); + } - get_rel_infos(cluster, pDbInfo); + upgrade_task_run(task, cluster); + upgrade_task_free(task); - if (cluster == &old_cluster) - get_old_cluster_logical_slot_infos(pDbInfo); - } + pg_free(rel_infos_query); + if (logical_slot_infos_query) + pg_free(logical_slot_infos_query); if (cluster == &old_cluster) pg_log(PG_VERBOSE, "\nsource databases:"); @@ -431,40 +458,21 @@ get_db_infos(ClusterInfo *cluster) /* - * get_rel_infos() + * get_rel_infos_query() * - * gets the relinfos for all the user tables and indexes of the database - * referred to by "dbinfo". + * Returns the query for retrieving the relation information for all the user + * tables and indexes in the database, for use by get_db_rel_and_slot_infos()'s + * UpgradeTask. * - * Note: the resulting RelInfo array is assumed to be sorted by OID. - * This allows later processing to match up old and new databases efficiently. + * Note: the result is assumed to be sorted by OID. This allows later + * processing to match up old and new databases efficiently. */ -static void -get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo) +static char * +get_rel_infos_query(void) { - PGconn *conn = connectToServer(cluster, - dbinfo->db_name); - PGresult *res; - RelInfo *relinfos; - int ntups; - int relnum; - int num_rels = 0; - char *nspname = NULL; - char *relname = NULL; - char *tablespace = NULL; - int i_spclocation, - i_nspname, - i_relname, - i_reloid, - i_indtable, - i_toastheap, - i_relfilenumber, - i_reltablespace; - char query[QUERY_ALLOC]; - char *last_namespace = NULL, - *last_tablespace = NULL; + PQExpBufferData query; - query[0] = '\0'; /* initialize query string to empty */ + initPQExpBuffer(&query); /* * Create a CTE that collects OIDs of regular user tables and matviews, @@ -476,34 +484,34 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo) * output, so we have to copy that system table. It's easiest to do that * by treating it as a user table. */ - snprintf(query + strlen(query), sizeof(query) - strlen(query), - "WITH regular_heap (reloid, indtable, toastheap) AS ( " - " SELECT c.oid, 0::oid, 0::oid " - " FROM pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n " - " ON c.relnamespace = n.oid " - " WHERE relkind IN (" CppAsString2(RELKIND_RELATION) ", " - CppAsString2(RELKIND_MATVIEW) ") AND " + appendPQExpBuffer(&query, + "WITH regular_heap (reloid, indtable, toastheap) AS ( " + " SELECT c.oid, 0::oid, 0::oid " + " FROM pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n " + " ON c.relnamespace = n.oid " + " WHERE relkind IN (" CppAsString2(RELKIND_RELATION) ", " + CppAsString2(RELKIND_MATVIEW) ") AND " /* exclude possible orphaned temp tables */ - " ((n.nspname !~ '^pg_temp_' AND " - " n.nspname !~ '^pg_toast_temp_' AND " - " n.nspname NOT IN ('pg_catalog', 'information_schema', " - " 'binary_upgrade', 'pg_toast') AND " - " c.oid >= %u::pg_catalog.oid) OR " - " (n.nspname = 'pg_catalog' AND " - " relname IN ('pg_largeobject') ))), ", - FirstNormalObjectId); + " ((n.nspname !~ '^pg_temp_' AND " + " n.nspname !~ '^pg_toast_temp_' AND " + " n.nspname NOT IN ('pg_catalog', 'information_schema', " + " 'binary_upgrade', 'pg_toast') AND " + " c.oid >= %u::pg_catalog.oid) OR " + " (n.nspname = 'pg_catalog' AND " + " relname IN ('pg_largeobject') ))), ", + FirstNormalObjectId); /* * Add a CTE that collects OIDs of toast tables belonging to the tables * selected by the regular_heap CTE. (We have to do this separately * because the namespace-name rules above don't work for toast tables.) */ - snprintf(query + strlen(query), sizeof(query) - strlen(query), - " toast_heap (reloid, indtable, toastheap) AS ( " - " SELECT c.reltoastrelid, 0::oid, c.oid " - " FROM regular_heap JOIN pg_catalog.pg_class c " - " ON regular_heap.reloid = c.oid " - " WHERE c.reltoastrelid != 0), "); + appendPQExpBufferStr(&query, + " toast_heap (reloid, indtable, toastheap) AS ( " + " SELECT c.reltoastrelid, 0::oid, c.oid " + " FROM regular_heap JOIN pg_catalog.pg_class c " + " ON regular_heap.reloid = c.oid " + " WHERE c.reltoastrelid != 0), "); /* * Add a CTE that collects OIDs of all valid indexes on the previously @@ -511,53 +519,68 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo) * Testing indisready is necessary in 9.2, and harmless in earlier/later * versions. */ - snprintf(query + strlen(query), sizeof(query) - strlen(query), - " all_index (reloid, indtable, toastheap) AS ( " - " SELECT indexrelid, indrelid, 0::oid " - " FROM pg_catalog.pg_index " - " WHERE indisvalid AND indisready " - " AND indrelid IN " - " (SELECT reloid FROM regular_heap " - " UNION ALL " - " SELECT reloid FROM toast_heap)) "); + appendPQExpBufferStr(&query, + " all_index (reloid, indtable, toastheap) AS ( " + " SELECT indexrelid, indrelid, 0::oid " + " FROM pg_catalog.pg_index " + " WHERE indisvalid AND indisready " + " AND indrelid IN " + " (SELECT reloid FROM regular_heap " + " UNION ALL " + " SELECT reloid FROM toast_heap)) "); /* * And now we can write the query that retrieves the data we want for each * heap and index relation. Make sure result is sorted by OID. */ - snprintf(query + strlen(query), sizeof(query) - strlen(query), - "SELECT all_rels.*, n.nspname, c.relname, " - " c.relfilenode, c.reltablespace, " - " pg_catalog.pg_tablespace_location(t.oid) AS spclocation " - "FROM (SELECT * FROM regular_heap " - " UNION ALL " - " SELECT * FROM toast_heap " - " UNION ALL " - " SELECT * FROM all_index) all_rels " - " JOIN pg_catalog.pg_class c " - " ON all_rels.reloid = c.oid " - " JOIN pg_catalog.pg_namespace n " - " ON c.relnamespace = n.oid " - " LEFT OUTER JOIN pg_catalog.pg_tablespace t " - " ON c.reltablespace = t.oid " - "ORDER BY 1;"); - - res = executeQueryOrDie(conn, "%s", query); - - ntups = PQntuples(res); + appendPQExpBufferStr(&query, + "SELECT all_rels.*, n.nspname, c.relname, " + " c.relfilenode, c.reltablespace, " + " pg_catalog.pg_tablespace_location(t.oid) AS spclocation " + "FROM (SELECT * FROM regular_heap " + " UNION ALL " + " SELECT * FROM toast_heap " + " UNION ALL " + " SELECT * FROM all_index) all_rels " + " JOIN pg_catalog.pg_class c " + " ON all_rels.reloid = c.oid " + " JOIN pg_catalog.pg_namespace n " + " ON c.relnamespace = n.oid " + " LEFT OUTER JOIN pg_catalog.pg_tablespace t " + " ON c.reltablespace = t.oid " + "ORDER BY 1;"); + + return query.data; +} - relinfos = (RelInfo *) pg_malloc(sizeof(RelInfo) * ntups); +/* + * Callback function for processing results of the query returned by + * get_rel_infos_query(), which is used for get_db_rel_and_slot_infos()'s + * UpgradeTask. This function stores the relation information for later use. + */ +static void +process_rel_infos(DbInfo *dbinfo, PGresult *res, void *arg) +{ + int ntups = PQntuples(res); + RelInfo *relinfos = (RelInfo *) pg_malloc(sizeof(RelInfo) * ntups); + int i_reloid = PQfnumber(res, "reloid"); + int i_indtable = PQfnumber(res, "indtable"); + int i_toastheap = PQfnumber(res, "toastheap"); + int i_nspname = PQfnumber(res, "nspname"); + int i_relname = PQfnumber(res, "relname"); + int i_relfilenumber = PQfnumber(res, "relfilenode"); + int i_reltablespace = PQfnumber(res, "reltablespace"); + int i_spclocation = PQfnumber(res, "spclocation"); + int num_rels = 0; + char *nspname = NULL; + char *relname = NULL; + char *tablespace = NULL; + char *last_namespace = NULL; + char *last_tablespace = NULL; - i_reloid = PQfnumber(res, "reloid"); - i_indtable = PQfnumber(res, "indtable"); - i_toastheap = PQfnumber(res, "toastheap"); - i_nspname = PQfnumber(res, "nspname"); - i_relname = PQfnumber(res, "relname"); - i_relfilenumber = PQfnumber(res, "relfilenode"); - i_reltablespace = PQfnumber(res, "reltablespace"); - i_spclocation = PQfnumber(res, "spclocation"); + AssertVariableIsOfType(&process_rel_infos, UpgradeTaskProcessCB); - for (relnum = 0; relnum < ntups; relnum++) + for (int relnum = 0; relnum < ntups; relnum++) { RelInfo *curr = &relinfos[num_rels++]; @@ -610,44 +633,22 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo) /* A zero reltablespace oid indicates the database tablespace. */ curr->tablespace = dbinfo->db_tablespace; } - PQclear(res); - - PQfinish(conn); dbinfo->rel_arr.rels = relinfos; dbinfo->rel_arr.nrels = num_rels; } /* - * get_old_cluster_logical_slot_infos() - * - * Gets the LogicalSlotInfos for all the logical replication slots of the - * database referred to by "dbinfo". The status of each logical slot is gotten - * here, but they are used at the checking phase. See - * check_old_cluster_for_valid_slots(). + * get_old_cluster_logical_slot_infos_query() * - * Note: This function will not do anything if the old cluster is pre-PG17. - * This is because before that the logical slots are not saved at shutdown, so - * there is no guarantee that the latest confirmed_flush_lsn is saved to disk - * which can lead to data loss. It is still not guaranteed for manually created - * slots in PG17, so subsequent checks done in - * check_old_cluster_for_valid_slots() would raise a FATAL error if such slots - * are included. + * Returns the query for retrieving the logical slot information for all the + * logical replication slots in the database, for use by + * get_db_rel_and_slot_infos()'s UpgradeTask. The status of each logical slot + * is checked in check_old_cluster_for_valid_slots(). */ -static void -get_old_cluster_logical_slot_infos(DbInfo *dbinfo) +static char * +get_old_cluster_logical_slot_infos_query(void) { - PGconn *conn; - PGresult *res; - LogicalSlotInfo *slotinfos = NULL; - int num_slots; - - /* Logical slots can be migrated since PG17. */ - if (GET_MAJOR_VERSION(old_cluster.major_version) <= 1600) - return; - - conn = connectToServer(&old_cluster, dbinfo->db_name); - /* * Fetch the logical replication slot information. The check whether the * slot is considered caught up is done by an upgrade function. This @@ -665,18 +666,32 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo) * started and stopped several times causing any temporary slots to be * removed. */ - res = executeQueryOrDie(conn, "SELECT slot_name, plugin, two_phase, failover, " - "%s as caught_up, invalidation_reason IS NOT NULL as invalid " - "FROM pg_catalog.pg_replication_slots " - "WHERE slot_type = 'logical' AND " - "database = current_database() AND " - "temporary IS FALSE;", - user_opts.live_check ? "FALSE" : - "(CASE WHEN invalidation_reason IS NOT NULL THEN FALSE " - "ELSE (SELECT pg_catalog.binary_upgrade_logical_slot_has_caught_up(slot_name)) " - "END)"); - - num_slots = PQntuples(res); + return psprintf("SELECT slot_name, plugin, two_phase, failover, " + "%s as caught_up, invalidation_reason IS NOT NULL as invalid " + "FROM pg_catalog.pg_replication_slots " + "WHERE slot_type = 'logical' AND " + "database = current_database() AND " + "temporary IS FALSE;", + user_opts.live_check ? "FALSE" : + "(CASE WHEN invalidation_reason IS NOT NULL THEN FALSE " + "ELSE (SELECT pg_catalog.binary_upgrade_logical_slot_has_caught_up(slot_name)) " + "END)"); +} + +/* + * Callback function for processing results of the query returned by + * get_old_cluster_logical_slot_infos_query(), which is used for + * get_db_rel_and_slot_infos()'s UpgradeTask. This function stores the logical + * slot information for later use. + */ +static void +process_old_cluster_logical_slot_infos(DbInfo *dbinfo, PGresult *res, void *arg) +{ + LogicalSlotInfo *slotinfos = NULL; + int num_slots = PQntuples(res); + + AssertVariableIsOfType(&process_old_cluster_logical_slot_infos, + UpgradeTaskProcessCB); if (num_slots) { @@ -709,9 +724,6 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo) } } - PQclear(res); - PQfinish(conn); - dbinfo->slot_arr.slots = slotinfos; dbinfo->slot_arr.nslots = num_slots; } -- 2.39.3 (Apple Git-146)