From 100d413e46c67d00c35b3dccc1ee28098a6c3f08 Mon Sep 17 00:00:00 2001 From: Nathan Bossart Date: Fri, 28 Jun 2024 21:09:33 -0500 Subject: [PATCH v7 03/11] use new pg_upgrade async API for retrieving relinfos --- src/bin/pg_upgrade/info.c | 238 ++++++++++++++++++-------------------- 1 file changed, 110 insertions(+), 128 deletions(-) diff --git a/src/bin/pg_upgrade/info.c b/src/bin/pg_upgrade/info.c index 5de5e10945..30a6d30284 100644 --- a/src/bin/pg_upgrade/info.c +++ b/src/bin/pg_upgrade/info.c @@ -11,6 +11,7 @@ #include "access/transam.h" #include "catalog/pg_class_d.h" +#include "pqexpbuffer.h" #include "pg_upgrade.h" static void create_rel_filename_map(const char *old_data, const char *new_data, @@ -22,12 +23,14 @@ static void report_unmatched_relation(const RelInfo *rel, const DbInfo *db, static void free_db_and_rel_infos(DbInfoArr *db_arr); static void get_template0_info(ClusterInfo *cluster); static void get_db_infos(ClusterInfo *cluster); -static void get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo); +static char *get_rel_infos_query(void *arg); +static void get_rel_infos_result(DbInfo *dbinfo, PGresult *res, void *arg); static void free_rel_infos(RelInfoArr *rel_arr); static void print_db_infos(DbInfoArr *db_arr); static void print_rel_infos(RelInfoArr *rel_arr); static void print_slot_infos(LogicalSlotInfoArr *slot_arr); -static void get_old_cluster_logical_slot_infos(DbInfo *dbinfo); +static char *get_old_cluster_logical_slot_infos_query(void *arg); +static void get_old_cluster_logical_slot_infos_result(DbInfo *dbinfo, PGresult *res, void *arg); /* @@ -276,7 +279,7 @@ report_unmatched_relation(const RelInfo *rel, const DbInfo *db, bool is_new_db) void get_db_rel_and_slot_infos(ClusterInfo *cluster) { - int dbnum; + AsyncTask *task = async_task_create(); if (cluster->dbarr.dbs != NULL) free_db_and_rel_infos(&cluster->dbarr); @@ -284,15 +287,19 @@ get_db_rel_and_slot_infos(ClusterInfo *cluster) get_template0_info(cluster); get_db_infos(cluster); - for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++) - { - DbInfo *pDbInfo = &cluster->dbarr.dbs[dbnum]; - - get_rel_infos(cluster, pDbInfo); + async_task_add_step(task, + get_rel_infos_query, + get_rel_infos_result, + true, NULL); + if (cluster == &old_cluster && + GET_MAJOR_VERSION(cluster->major_version) > 1600) + async_task_add_step(task, + get_old_cluster_logical_slot_infos_query, + get_old_cluster_logical_slot_infos_result, + true, NULL); - if (cluster == &old_cluster) - get_old_cluster_logical_slot_infos(pDbInfo); - } + async_task_run(task, cluster); + async_task_free(task); if (cluster == &old_cluster) pg_log(PG_VERBOSE, "\nsource databases:"); @@ -439,32 +446,12 @@ get_db_infos(ClusterInfo *cluster) * Note: the resulting RelInfo array is assumed to be sorted by OID. * This allows later processing to match up old and new databases efficiently. */ -static void -get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo) +static char * +get_rel_infos_query(void *arg) { - PGconn *conn = connectToServer(cluster, - dbinfo->db_name); - PGresult *res; - RelInfo *relinfos; - int ntups; - int relnum; - int num_rels = 0; - char *nspname = NULL; - char *relname = NULL; - char *tablespace = NULL; - int i_spclocation, - i_nspname, - i_relname, - i_reloid, - i_indtable, - i_toastheap, - i_relfilenumber, - i_reltablespace; - char query[QUERY_ALLOC]; - char *last_namespace = NULL, - *last_tablespace = NULL; + PQExpBufferData query; - query[0] = '\0'; /* initialize query string to empty */ + initPQExpBuffer(&query); /* * Create a CTE that collects OIDs of regular user tables and matviews, @@ -476,34 +463,34 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo) * output, so we have to copy that system table. It's easiest to do that * by treating it as a user table. */ - snprintf(query + strlen(query), sizeof(query) - strlen(query), - "WITH regular_heap (reloid, indtable, toastheap) AS ( " - " SELECT c.oid, 0::oid, 0::oid " - " FROM pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n " - " ON c.relnamespace = n.oid " - " WHERE relkind IN (" CppAsString2(RELKIND_RELATION) ", " - CppAsString2(RELKIND_MATVIEW) ") AND " + appendPQExpBuffer(&query, + "WITH regular_heap (reloid, indtable, toastheap) AS ( " + " SELECT c.oid, 0::oid, 0::oid " + " FROM pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n " + " ON c.relnamespace = n.oid " + " WHERE relkind IN (" CppAsString2(RELKIND_RELATION) ", " + CppAsString2(RELKIND_MATVIEW) ") AND " /* exclude possible orphaned temp tables */ - " ((n.nspname !~ '^pg_temp_' AND " - " n.nspname !~ '^pg_toast_temp_' AND " - " n.nspname NOT IN ('pg_catalog', 'information_schema', " - " 'binary_upgrade', 'pg_toast') AND " - " c.oid >= %u::pg_catalog.oid) OR " - " (n.nspname = 'pg_catalog' AND " - " relname IN ('pg_largeobject') ))), ", - FirstNormalObjectId); + " ((n.nspname !~ '^pg_temp_' AND " + " n.nspname !~ '^pg_toast_temp_' AND " + " n.nspname NOT IN ('pg_catalog', 'information_schema', " + " 'binary_upgrade', 'pg_toast') AND " + " c.oid >= %u::pg_catalog.oid) OR " + " (n.nspname = 'pg_catalog' AND " + " relname IN ('pg_largeobject') ))), ", + FirstNormalObjectId); /* * Add a CTE that collects OIDs of toast tables belonging to the tables * selected by the regular_heap CTE. (We have to do this separately * because the namespace-name rules above don't work for toast tables.) */ - snprintf(query + strlen(query), sizeof(query) - strlen(query), - " toast_heap (reloid, indtable, toastheap) AS ( " - " SELECT c.reltoastrelid, 0::oid, c.oid " - " FROM regular_heap JOIN pg_catalog.pg_class c " - " ON regular_heap.reloid = c.oid " - " WHERE c.reltoastrelid != 0), "); + appendPQExpBufferStr(&query, + " toast_heap (reloid, indtable, toastheap) AS ( " + " SELECT c.reltoastrelid, 0::oid, c.oid " + " FROM regular_heap JOIN pg_catalog.pg_class c " + " ON regular_heap.reloid = c.oid " + " WHERE c.reltoastrelid != 0), "); /* * Add a CTE that collects OIDs of all valid indexes on the previously @@ -511,53 +498,61 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo) * Testing indisready is necessary in 9.2, and harmless in earlier/later * versions. */ - snprintf(query + strlen(query), sizeof(query) - strlen(query), - " all_index (reloid, indtable, toastheap) AS ( " - " SELECT indexrelid, indrelid, 0::oid " - " FROM pg_catalog.pg_index " - " WHERE indisvalid AND indisready " - " AND indrelid IN " - " (SELECT reloid FROM regular_heap " - " UNION ALL " - " SELECT reloid FROM toast_heap)) "); + appendPQExpBufferStr(&query, + " all_index (reloid, indtable, toastheap) AS ( " + " SELECT indexrelid, indrelid, 0::oid " + " FROM pg_catalog.pg_index " + " WHERE indisvalid AND indisready " + " AND indrelid IN " + " (SELECT reloid FROM regular_heap " + " UNION ALL " + " SELECT reloid FROM toast_heap)) "); /* * And now we can write the query that retrieves the data we want for each * heap and index relation. Make sure result is sorted by OID. */ - snprintf(query + strlen(query), sizeof(query) - strlen(query), - "SELECT all_rels.*, n.nspname, c.relname, " - " c.relfilenode, c.reltablespace, " - " pg_catalog.pg_tablespace_location(t.oid) AS spclocation " - "FROM (SELECT * FROM regular_heap " - " UNION ALL " - " SELECT * FROM toast_heap " - " UNION ALL " - " SELECT * FROM all_index) all_rels " - " JOIN pg_catalog.pg_class c " - " ON all_rels.reloid = c.oid " - " JOIN pg_catalog.pg_namespace n " - " ON c.relnamespace = n.oid " - " LEFT OUTER JOIN pg_catalog.pg_tablespace t " - " ON c.reltablespace = t.oid " - "ORDER BY 1;"); - - res = executeQueryOrDie(conn, "%s", query); - - ntups = PQntuples(res); - - relinfos = (RelInfo *) pg_malloc(sizeof(RelInfo) * ntups); + appendPQExpBufferStr(&query, + "SELECT all_rels.*, n.nspname, c.relname, " + " c.relfilenode, c.reltablespace, " + " pg_catalog.pg_tablespace_location(t.oid) AS spclocation " + "FROM (SELECT * FROM regular_heap " + " UNION ALL " + " SELECT * FROM toast_heap " + " UNION ALL " + " SELECT * FROM all_index) all_rels " + " JOIN pg_catalog.pg_class c " + " ON all_rels.reloid = c.oid " + " JOIN pg_catalog.pg_namespace n " + " ON c.relnamespace = n.oid " + " LEFT OUTER JOIN pg_catalog.pg_tablespace t " + " ON c.reltablespace = t.oid " + "ORDER BY 1;"); + + return query.data; +} - i_reloid = PQfnumber(res, "reloid"); - i_indtable = PQfnumber(res, "indtable"); - i_toastheap = PQfnumber(res, "toastheap"); - i_nspname = PQfnumber(res, "nspname"); - i_relname = PQfnumber(res, "relname"); - i_relfilenumber = PQfnumber(res, "relfilenode"); - i_reltablespace = PQfnumber(res, "reltablespace"); - i_spclocation = PQfnumber(res, "spclocation"); +static void +get_rel_infos_result(DbInfo *dbinfo, PGresult *res, void *arg) +{ + int ntups = PQntuples(res); + RelInfo *relinfos = (RelInfo *) pg_malloc(sizeof(RelInfo) * ntups); + int i_reloid = PQfnumber(res, "reloid"); + int i_indtable = PQfnumber(res, "indtable"); + int i_toastheap = PQfnumber(res, "toastheap"); + int i_nspname = PQfnumber(res, "nspname"); + int i_relname = PQfnumber(res, "relname"); + int i_relfilenumber = PQfnumber(res, "relfilenode"); + int i_reltablespace = PQfnumber(res, "reltablespace"); + int i_spclocation = PQfnumber(res, "spclocation"); + int num_rels = 0; + char *nspname = NULL; + char *relname = NULL; + char *tablespace = NULL; + char *last_namespace = NULL; + char *last_tablespace = NULL; - for (relnum = 0; relnum < ntups; relnum++) + for (int relnum = 0; relnum < ntups; relnum++) { RelInfo *curr = &relinfos[num_rels++]; @@ -610,9 +605,6 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo) /* A zero reltablespace oid indicates the database tablespace. */ curr->tablespace = dbinfo->db_tablespace; } - PQclear(res); - - PQfinish(conn); dbinfo->rel_arr.rels = relinfos; dbinfo->rel_arr.nrels = num_rels; @@ -634,20 +626,9 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo) * check_old_cluster_for_valid_slots() would raise a FATAL error if such slots * are included. */ -static void -get_old_cluster_logical_slot_infos(DbInfo *dbinfo) +static char * +get_old_cluster_logical_slot_infos_query(void *arg) { - PGconn *conn; - PGresult *res; - LogicalSlotInfo *slotinfos = NULL; - int num_slots; - - /* Logical slots can be migrated since PG17. */ - if (GET_MAJOR_VERSION(old_cluster.major_version) <= 1600) - return; - - conn = connectToServer(&old_cluster, dbinfo->db_name); - /* * Fetch the logical replication slot information. The check whether the * slot is considered caught up is done by an upgrade function. This @@ -665,18 +646,23 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo) * started and stopped several times causing any temporary slots to be * removed. */ - res = executeQueryOrDie(conn, "SELECT slot_name, plugin, two_phase, failover, " - "%s as caught_up, invalidation_reason IS NOT NULL as invalid " - "FROM pg_catalog.pg_replication_slots " - "WHERE slot_type = 'logical' AND " - "database = current_database() AND " - "temporary IS FALSE;", - user_opts.live_check ? "FALSE" : - "(CASE WHEN invalidation_reason IS NOT NULL THEN FALSE " - "ELSE (SELECT pg_catalog.binary_upgrade_logical_slot_has_caught_up(slot_name)) " - "END)"); - - num_slots = PQntuples(res); + return psprintf("SELECT slot_name, plugin, two_phase, failover, " + "%s as caught_up, invalidation_reason IS NOT NULL as invalid " + "FROM pg_catalog.pg_replication_slots " + "WHERE slot_type = 'logical' AND " + "database = current_database() AND " + "temporary IS FALSE;", + user_opts.live_check ? "FALSE" : + "(CASE WHEN invalidation_reason IS NOT NULL THEN FALSE " + "ELSE (SELECT pg_catalog.binary_upgrade_logical_slot_has_caught_up(slot_name)) " + "END)"); +} + +static void +get_old_cluster_logical_slot_infos_result(DbInfo *dbinfo, PGresult *res, void *arg) +{ + LogicalSlotInfo *slotinfos = NULL; + int num_slots = PQntuples(res); if (num_slots) { @@ -709,14 +695,10 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo) } } - PQclear(res); - PQfinish(conn); - dbinfo->slot_arr.slots = slotinfos; dbinfo->slot_arr.nslots = num_slots; } - /* * count_old_cluster_logical_slots() * -- 2.39.3 (Apple Git-146)