From d696ef52ff8ac8bbbe65e4ea6e5554606cbca19d Mon Sep 17 00:00:00 2001 From: Nathan Bossart Date: Fri, 28 Jun 2024 21:24:35 -0500 Subject: [PATCH v8 04/11] use new pg_upgrade async API to parallelize getting loadable libraries --- src/bin/pg_upgrade/function.c | 49 +++++++++++++++++++++-------------- 1 file changed, 29 insertions(+), 20 deletions(-) diff --git a/src/bin/pg_upgrade/function.c b/src/bin/pg_upgrade/function.c index 7e3abed098..75e5ebb2c8 100644 --- a/src/bin/pg_upgrade/function.c +++ b/src/bin/pg_upgrade/function.c @@ -42,6 +42,20 @@ library_name_compare(const void *p1, const void *p2) ((const LibraryInfo *) p2)->dbnum); } +struct loadable_libraries_state +{ + PGresult **ress; + int totaltups; +}; + +static void +get_loadable_libraries_result(DbInfo *dbinfo, PGresult *res, void *arg) +{ + struct loadable_libraries_state *state = (struct loadable_libraries_state *) arg; + + state->ress[dbinfo - old_cluster.dbarr.dbs] = res; + state->totaltups += PQntuples(res); +} /* * get_loadable_libraries() @@ -54,47 +68,41 @@ library_name_compare(const void *p1, const void *p2) void get_loadable_libraries(void) { - PGresult **ress; int totaltups; int dbnum; int n_libinfos; + AsyncTask *task = async_task_create(); + struct loadable_libraries_state state; + char *loadable_libraries_query; - ress = (PGresult **) pg_malloc(old_cluster.dbarr.ndbs * sizeof(PGresult *)); - totaltups = 0; - - /* Fetch all library names, removing duplicates within each DB */ - for (dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++) - { - DbInfo *active_db = &old_cluster.dbarr.dbs[dbnum]; - PGconn *conn = connectToServer(&old_cluster, active_db->db_name); + state.ress = (PGresult **) pg_malloc(old_cluster.dbarr.ndbs * sizeof(PGresult *)); + state.totaltups = 0; - /* - * Fetch all libraries containing non-built-in C functions in this DB. - */ - ress[dbnum] = executeQueryOrDie(conn, - "SELECT DISTINCT probin " + loadable_libraries_query = psprintf("SELECT DISTINCT probin " "FROM pg_catalog.pg_proc " "WHERE prolang = %u AND " "probin IS NOT NULL AND " "oid >= %u;", ClanguageId, FirstNormalObjectId); - totaltups += PQntuples(ress[dbnum]); - PQfinish(conn); - } + async_task_add_step(task, loadable_libraries_query, + get_loadable_libraries_result, false, &state); + + async_task_run(task, &old_cluster); + async_task_free(task); /* * Allocate memory for required libraries and logical replication output * plugins. */ - n_libinfos = totaltups + count_old_cluster_logical_slots(); + n_libinfos = state.totaltups + count_old_cluster_logical_slots(); os_info.libraries = (LibraryInfo *) pg_malloc(sizeof(LibraryInfo) * n_libinfos); totaltups = 0; for (dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++) { - PGresult *res = ress[dbnum]; + PGresult *res = state.ress[dbnum]; int ntups; int rowno; LogicalSlotInfoArr *slot_arr = &old_cluster.dbarr.dbs[dbnum].slot_arr; @@ -129,7 +137,8 @@ get_loadable_libraries(void) } } - pg_free(ress); + pg_free(state.ress); + pg_free(loadable_libraries_query); os_info.num_libraries = totaltups; } -- 2.39.3 (Apple Git-146)