From 7b72cf76faba3532499e0f4d6ba1f5035f3fe1b6 Mon Sep 17 00:00:00 2001 From: Nathan Bossart Date: Fri, 28 Jun 2024 21:09:33 -0500 Subject: [PATCH v3 4/7] use new pg_upgrade async API for retrieving relinfos --- src/bin/pg_upgrade/info.c | 187 +++++++++++++++++--------------------- 1 file changed, 81 insertions(+), 106 deletions(-) diff --git a/src/bin/pg_upgrade/info.c b/src/bin/pg_upgrade/info.c index 8f1777de59..d07255bd0a 100644 --- a/src/bin/pg_upgrade/info.c +++ b/src/bin/pg_upgrade/info.c @@ -22,13 +22,16 @@ static void report_unmatched_relation(const RelInfo *rel, const DbInfo *db, static void free_db_and_rel_infos(DbInfoArr *db_arr); static void get_template0_info(ClusterInfo *cluster); static void get_db_infos(ClusterInfo *cluster); -static void get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo); +static char *get_rel_infos_query(DbInfo *dbinfo, void *arg); +static void get_rel_infos_result(DbInfo *dbinfo, PGresult *res, void *arg); static void free_rel_infos(RelInfoArr *rel_arr); static void print_db_infos(DbInfoArr *db_arr); static void print_rel_infos(RelInfoArr *rel_arr); static void print_slot_infos(LogicalSlotInfoArr *slot_arr); -static void get_old_cluster_logical_slot_infos(DbInfo *dbinfo); -static void get_db_subscription_count(DbInfo *dbinfo); +static char *get_old_cluster_logical_slot_infos_query(DbInfo *dbinfo, void *arg); +static void get_old_cluster_logical_slot_infos_result(DbInfo *dbinfo, PGresult *res, void *arg); +static char *get_db_subscription_count_query(DbInfo *dbinfo, void *arg); +static void get_db_subscription_count_result(DbInfo *dbinfo, PGresult *res, void *arg); /* @@ -277,7 +280,7 @@ report_unmatched_relation(const RelInfo *rel, const DbInfo *db, bool is_new_db) void get_db_rel_and_slot_infos(ClusterInfo *cluster) { - int dbnum; + AsyncTask *task = async_task_create(); if (cluster->dbarr.dbs != NULL) free_db_and_rel_infos(&cluster->dbarr); @@ -285,23 +288,26 @@ get_db_rel_and_slot_infos(ClusterInfo *cluster) get_template0_info(cluster); get_db_infos(cluster); - for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++) + async_task_add_step(task, + get_rel_infos_query, + get_rel_infos_result, + true, NULL); + if (cluster == &old_cluster && + GET_MAJOR_VERSION(cluster->major_version) > 1600) { - DbInfo *pDbInfo = &cluster->dbarr.dbs[dbnum]; - - get_rel_infos(cluster, pDbInfo); - - /* - * Retrieve the logical replication slots infos and the subscriptions - * count for the old cluster. - */ - if (cluster == &old_cluster) - { - get_old_cluster_logical_slot_infos(pDbInfo); - get_db_subscription_count(pDbInfo); - } + async_task_add_step(task, + get_old_cluster_logical_slot_infos_query, + get_old_cluster_logical_slot_infos_result, + true, cluster); + async_task_add_step(task, + get_db_subscription_count_query, + get_db_subscription_count_result, + true, cluster); } + async_task_run(task, cluster); + async_task_free(task); + if (cluster == &old_cluster) pg_log(PG_VERBOSE, "\nsource databases:"); else @@ -447,30 +453,10 @@ get_db_infos(ClusterInfo *cluster) * Note: the resulting RelInfo array is assumed to be sorted by OID. * This allows later processing to match up old and new databases efficiently. */ -static void -get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo) +static char * +get_rel_infos_query(DbInfo *dbinfo, void *arg) { - PGconn *conn = connectToServer(cluster, - dbinfo->db_name); - PGresult *res; - RelInfo *relinfos; - int ntups; - int relnum; - int num_rels = 0; - char *nspname = NULL; - char *relname = NULL; - char *tablespace = NULL; - int i_spclocation, - i_nspname, - i_relname, - i_reloid, - i_indtable, - i_toastheap, - i_relfilenumber, - i_reltablespace; - char query[QUERY_ALLOC]; - char *last_namespace = NULL, - *last_tablespace = NULL; + char *query = pg_malloc(QUERY_ALLOC); query[0] = '\0'; /* initialize query string to empty */ @@ -484,7 +470,7 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo) * output, so we have to copy that system table. It's easiest to do that * by treating it as a user table. */ - snprintf(query + strlen(query), sizeof(query) - strlen(query), + snprintf(query + strlen(query), QUERY_ALLOC - strlen(query), "WITH regular_heap (reloid, indtable, toastheap) AS ( " " SELECT c.oid, 0::oid, 0::oid " " FROM pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n " @@ -506,7 +492,7 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo) * selected by the regular_heap CTE. (We have to do this separately * because the namespace-name rules above don't work for toast tables.) */ - snprintf(query + strlen(query), sizeof(query) - strlen(query), + snprintf(query + strlen(query), QUERY_ALLOC - strlen(query), " toast_heap (reloid, indtable, toastheap) AS ( " " SELECT c.reltoastrelid, 0::oid, c.oid " " FROM regular_heap JOIN pg_catalog.pg_class c " @@ -519,7 +505,7 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo) * Testing indisready is necessary in 9.2, and harmless in earlier/later * versions. */ - snprintf(query + strlen(query), sizeof(query) - strlen(query), + snprintf(query + strlen(query), QUERY_ALLOC - strlen(query), " all_index (reloid, indtable, toastheap) AS ( " " SELECT indexrelid, indrelid, 0::oid " " FROM pg_catalog.pg_index " @@ -533,7 +519,7 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo) * And now we can write the query that retrieves the data we want for each * heap and index relation. Make sure result is sorted by OID. */ - snprintf(query + strlen(query), sizeof(query) - strlen(query), + snprintf(query + strlen(query), QUERY_ALLOC - strlen(query), "SELECT all_rels.*, n.nspname, c.relname, " " c.relfilenode, c.reltablespace, " " pg_catalog.pg_tablespace_location(t.oid) AS spclocation " @@ -550,22 +536,30 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo) " ON c.reltablespace = t.oid " "ORDER BY 1;"); - res = executeQueryOrDie(conn, "%s", query); - - ntups = PQntuples(res); - - relinfos = (RelInfo *) pg_malloc(sizeof(RelInfo) * ntups); + return query; +} - i_reloid = PQfnumber(res, "reloid"); - i_indtable = PQfnumber(res, "indtable"); - i_toastheap = PQfnumber(res, "toastheap"); - i_nspname = PQfnumber(res, "nspname"); - i_relname = PQfnumber(res, "relname"); - i_relfilenumber = PQfnumber(res, "relfilenode"); - i_reltablespace = PQfnumber(res, "reltablespace"); - i_spclocation = PQfnumber(res, "spclocation"); +static void +get_rel_infos_result(DbInfo *dbinfo, PGresult *res, void *arg) +{ + int ntups = PQntuples(res); + RelInfo *relinfos = (RelInfo *) pg_malloc(sizeof(RelInfo) * ntups); + int i_reloid = PQfnumber(res, "reloid"); + int i_indtable = PQfnumber(res, "indtable"); + int i_toastheap = PQfnumber(res, "toastheap"); + int i_nspname = PQfnumber(res, "nspname"); + int i_relname = PQfnumber(res, "relname"); + int i_relfilenumber = PQfnumber(res, "relfilenode"); + int i_reltablespace = PQfnumber(res, "reltablespace"); + int i_spclocation = PQfnumber(res, "spclocation"); + int num_rels = 0; + char *nspname = NULL; + char *relname = NULL; + char *tablespace = NULL; + char *last_namespace = NULL; + char *last_tablespace = NULL; - for (relnum = 0; relnum < ntups; relnum++) + for (int relnum = 0; relnum < ntups; relnum++) { RelInfo *curr = &relinfos[num_rels++]; @@ -618,9 +612,6 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo) /* A zero reltablespace oid indicates the database tablespace. */ curr->tablespace = dbinfo->db_tablespace; } - PQclear(res); - - PQfinish(conn); dbinfo->rel_arr.rels = relinfos; dbinfo->rel_arr.nrels = num_rels; @@ -642,20 +633,9 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo) * check_old_cluster_for_valid_slots() would raise a FATAL error if such slots * are included. */ -static void -get_old_cluster_logical_slot_infos(DbInfo *dbinfo) +static char * +get_old_cluster_logical_slot_infos_query(DbInfo *dbinfo, void *arg) { - PGconn *conn; - PGresult *res; - LogicalSlotInfo *slotinfos = NULL; - int num_slots; - - /* Logical slots can be migrated since PG17. */ - if (GET_MAJOR_VERSION(old_cluster.major_version) <= 1600) - return; - - conn = connectToServer(&old_cluster, dbinfo->db_name); - /* * Fetch the logical replication slot information. The check whether the * slot is considered caught up is done by an upgrade function. This @@ -673,18 +653,23 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo) * started and stopped several times causing any temporary slots to be * removed. */ - res = executeQueryOrDie(conn, "SELECT slot_name, plugin, two_phase, failover, " - "%s as caught_up, invalidation_reason IS NOT NULL as invalid " - "FROM pg_catalog.pg_replication_slots " - "WHERE slot_type = 'logical' AND " - "database = current_database() AND " - "temporary IS FALSE;", - user_opts.live_check ? "FALSE" : - "(CASE WHEN invalidation_reason IS NOT NULL THEN FALSE " - "ELSE (SELECT pg_catalog.binary_upgrade_logical_slot_has_caught_up(slot_name)) " - "END)"); - - num_slots = PQntuples(res); + return psprintf("SELECT slot_name, plugin, two_phase, failover, " + "%s as caught_up, invalidation_reason IS NOT NULL as invalid " + "FROM pg_catalog.pg_replication_slots " + "WHERE slot_type = 'logical' AND " + "database = current_database() AND " + "temporary IS FALSE;", + user_opts.live_check ? "FALSE" : + "(CASE WHEN invalidation_reason IS NOT NULL THEN FALSE " + "ELSE (SELECT pg_catalog.binary_upgrade_logical_slot_has_caught_up(slot_name)) " + "END)"); +} + +static void +get_old_cluster_logical_slot_infos_result(DbInfo *dbinfo, PGresult *res, void *arg) +{ + LogicalSlotInfo *slotinfos = NULL; + int num_slots = PQntuples(res); if (num_slots) { @@ -717,14 +702,10 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo) } } - PQclear(res); - PQfinish(conn); - dbinfo->slot_arr.slots = slotinfos; dbinfo->slot_arr.nslots = num_slots; } - /* * count_old_cluster_logical_slots() * @@ -754,24 +735,18 @@ count_old_cluster_logical_slots(void) * This is because before that the logical slots are not upgraded, so we will * not be able to upgrade the logical replication clusters completely. */ -static void -get_db_subscription_count(DbInfo *dbinfo) +static char * +get_db_subscription_count_query(DbInfo *dbinfo, void *arg) { - PGconn *conn; - PGresult *res; - - /* Subscriptions can be migrated since PG17. */ - if (GET_MAJOR_VERSION(old_cluster.major_version) < 1700) - return; + return psprintf("SELECT count(*) " + "FROM pg_catalog.pg_subscription WHERE subdbid = %u", + dbinfo->db_oid); +} - conn = connectToServer(&old_cluster, dbinfo->db_name); - res = executeQueryOrDie(conn, "SELECT count(*) " - "FROM pg_catalog.pg_subscription WHERE subdbid = %u", - dbinfo->db_oid); +static void +get_db_subscription_count_result(DbInfo *dbinfo, PGresult *res, void *arg) +{ dbinfo->nsubs = atoi(PQgetvalue(res, 0, 0)); - - PQclear(res); - PQfinish(conn); } /* -- 2.39.3 (Apple Git-146)