From c6122d5b94375ebda9ebe7c2f78fb73b44780b78 Mon Sep 17 00:00:00 2001 From: Nathan Bossart Date: Sat, 6 Jul 2024 21:06:31 -0500 Subject: [PATCH v4 07/12] parallelize data type checks in pg_upgrade --- src/bin/pg_upgrade/check.c | 319 +++++++++++++++++++------------------ 1 file changed, 160 insertions(+), 159 deletions(-) diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c index 251f3d9017..800d944218 100644 --- a/src/bin/pg_upgrade/check.c +++ b/src/bin/pg_upgrade/check.c @@ -32,6 +32,10 @@ static void check_new_cluster_subscription_configuration(void); static void check_old_cluster_for_valid_slots(void); static void check_old_cluster_subscription_state(void); +static bool *data_type_check_results; +static bool data_type_check_failed; +static PQExpBufferData data_type_check_report; + /* * DataTypesUsageChecks - definitions of data type checks for the old cluster * in order to determine if an upgrade can be performed. See the comment on @@ -314,6 +318,129 @@ static DataTypesUsageChecks data_types_usage_checks[] = } }; +static char * +data_type_check_query(DbInfo *dbinfo, void *arg) +{ + int i = (int) (intptr_t) arg; + DataTypesUsageChecks *check = &data_types_usage_checks[i]; + + return psprintf("WITH RECURSIVE oids AS ( " + /* start with the type(s) returned by base_query */ + " %s " + " UNION ALL " + " SELECT * FROM ( " + /* inner WITH because we can only reference the CTE once */ + " WITH x AS (SELECT oid FROM oids) " + /* domains on any type selected so far */ + " SELECT t.oid FROM pg_catalog.pg_type t, x WHERE typbasetype = x.oid AND typtype = 'd' " + " UNION ALL " + /* arrays over any type selected so far */ + " SELECT t.oid FROM pg_catalog.pg_type t, x WHERE typelem = x.oid AND typtype = 'b' " + " UNION ALL " + /* composite types containing any type selected so far */ + " SELECT t.oid FROM pg_catalog.pg_type t, pg_catalog.pg_class c, pg_catalog.pg_attribute a, x " + " WHERE t.typtype = 'c' AND " + " t.oid = c.reltype AND " + " c.oid = a.attrelid AND " + " NOT a.attisdropped AND " + " a.atttypid = x.oid " + " UNION ALL " + /* ranges containing any type selected so far */ + " SELECT t.oid FROM pg_catalog.pg_type t, pg_catalog.pg_range r, x " + " WHERE t.typtype = 'r' AND r.rngtypid = t.oid AND r.rngsubtype = x.oid" + " ) foo " + ") " + /* now look for stored columns of any such type */ + "SELECT n.nspname, c.relname, a.attname " + "FROM pg_catalog.pg_class c, " + " pg_catalog.pg_namespace n, " + " pg_catalog.pg_attribute a " + "WHERE c.oid = a.attrelid AND " + " NOT a.attisdropped AND " + " a.atttypid IN (SELECT oid FROM oids) AND " + " c.relkind IN (" + CppAsString2(RELKIND_RELATION) ", " + CppAsString2(RELKIND_MATVIEW) ", " + CppAsString2(RELKIND_INDEX) ") AND " + " c.relnamespace = n.oid AND " + /* exclude possible orphaned temp tables */ + " n.nspname !~ '^pg_temp_' AND " + " n.nspname !~ '^pg_toast_temp_' AND " + /* exclude system catalogs, too */ + " n.nspname NOT IN ('pg_catalog', 'information_schema')", + check->base_query); +} + +static void +data_type_check_process(DbInfo *dbinfo, PGresult *res, void *arg) +{ + int i = (int) (intptr_t) arg; + DataTypesUsageChecks *check = &data_types_usage_checks[i]; + int ntups = PQntuples(res); + + if (ntups) + { + char output_path[MAXPGPATH]; + int i_nspname; + int i_relname; + int i_attname; + FILE *script = NULL; + bool db_used = false; + + snprintf(output_path, sizeof(output_path), "%s/%s", + log_opts.basedir, + check->report_filename); + + /* + * Make sure we have a buffer to save reports to now that we found a + * first failing check. + */ + if (!data_type_check_failed) + initPQExpBuffer(&data_type_check_report); + data_type_check_failed = true; + + /* + * If this is the first time we see an error for the check in question + * then print a status message of the failure. + */ + if (!data_type_check_results[i]) + { + pg_log(PG_REPORT, " failed check: %s", _(check->status)); + appendPQExpBuffer(&data_type_check_report, "\n%s\n%s %s\n", + _(check->report_text), + _("A list of the problem columns is in the file:"), + output_path); + } + data_type_check_results[i] = true; + + i_nspname = PQfnumber(res, "nspname"); + i_relname = PQfnumber(res, "relname"); + i_attname = PQfnumber(res, "attname"); + + for (int rowno = 0; rowno < ntups; rowno++) + { + if (script == NULL && (script = fopen_priv(output_path, "a")) == NULL) + pg_fatal("could not open file \"%s\": %m", output_path); + + if (!db_used) + { + fprintf(script, "In database: %s\n", dbinfo->db_name); + db_used = true; + } + fprintf(script, " %s.%s.%s\n", + PQgetvalue(res, rowno, i_nspname), + PQgetvalue(res, rowno, i_relname), + PQgetvalue(res, rowno, i_attname)); + } + + if (script) + { + fclose(script); + script = NULL; + } + } +} + /* * check_for_data_types_usage() * Detect whether there are any stored columns depending on given type(s) @@ -337,10 +464,9 @@ static void check_for_data_types_usage(ClusterInfo *cluster, DataTypesUsageChecks *checks) { bool found = false; - bool *results; - PQExpBufferData report; DataTypesUsageChecks *tmp = checks; int n_data_types_usage_checks = 0; + AsyncTask *task = async_task_create(); prep_status("Checking for data type usage"); @@ -352,176 +478,51 @@ check_for_data_types_usage(ClusterInfo *cluster, DataTypesUsageChecks *checks) } /* Prepare an array to store the results of checks in */ - results = pg_malloc0(sizeof(bool) * n_data_types_usage_checks); + data_type_check_results = pg_malloc0(sizeof(bool) * n_data_types_usage_checks); + data_type_check_failed = false; - /* - * Connect to each database in the cluster and run all defined checks - * against that database before trying the next one. - */ - for (int dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++) + for (int i = 0; i < n_data_types_usage_checks; i++) { - DbInfo *active_db = &cluster->dbarr.dbs[dbnum]; - PGconn *conn = connectToServer(cluster, active_db->db_name); + DataTypesUsageChecks *check = &checks[i]; - for (int checknum = 0; checknum < n_data_types_usage_checks; checknum++) + if (check->threshold_version == MANUAL_CHECK) { - PGresult *res; - int ntups; - int i_nspname; - int i_relname; - int i_attname; - FILE *script = NULL; - bool db_used = false; - char output_path[MAXPGPATH]; - DataTypesUsageChecks *cur_check = &checks[checknum]; - - if (cur_check->threshold_version == MANUAL_CHECK) - { - Assert(cur_check->version_hook); - - /* - * Make sure that the check applies to the current cluster - * version and skip if not. If no check hook has been defined - * we run the check for all versions. - */ - if (!cur_check->version_hook(cluster)) - continue; - } - else if (cur_check->threshold_version != ALL_VERSIONS) - { - if (GET_MAJOR_VERSION(cluster->major_version) > cur_check->threshold_version) - continue; - } - else - Assert(cur_check->threshold_version == ALL_VERSIONS); - - snprintf(output_path, sizeof(output_path), "%s/%s", - log_opts.basedir, - cur_check->report_filename); - - /* - * The type(s) of interest might be wrapped in a domain, array, - * composite, or range, and these container types can be nested - * (to varying extents depending on server version, but that's not - * of concern here). To handle all these cases we need a - * recursive CTE. - */ - res = executeQueryOrDie(conn, - "WITH RECURSIVE oids AS ( " - /* start with the type(s) returned by base_query */ - " %s " - " UNION ALL " - " SELECT * FROM ( " - /* inner WITH because we can only reference the CTE once */ - " WITH x AS (SELECT oid FROM oids) " - /* domains on any type selected so far */ - " SELECT t.oid FROM pg_catalog.pg_type t, x WHERE typbasetype = x.oid AND typtype = 'd' " - " UNION ALL " - /* arrays over any type selected so far */ - " SELECT t.oid FROM pg_catalog.pg_type t, x WHERE typelem = x.oid AND typtype = 'b' " - " UNION ALL " - /* composite types containing any type selected so far */ - " SELECT t.oid FROM pg_catalog.pg_type t, pg_catalog.pg_class c, pg_catalog.pg_attribute a, x " - " WHERE t.typtype = 'c' AND " - " t.oid = c.reltype AND " - " c.oid = a.attrelid AND " - " NOT a.attisdropped AND " - " a.atttypid = x.oid " - " UNION ALL " - /* ranges containing any type selected so far */ - " SELECT t.oid FROM pg_catalog.pg_type t, pg_catalog.pg_range r, x " - " WHERE t.typtype = 'r' AND r.rngtypid = t.oid AND r.rngsubtype = x.oid" - " ) foo " - ") " - /* now look for stored columns of any such type */ - "SELECT n.nspname, c.relname, a.attname " - "FROM pg_catalog.pg_class c, " - " pg_catalog.pg_namespace n, " - " pg_catalog.pg_attribute a " - "WHERE c.oid = a.attrelid AND " - " NOT a.attisdropped AND " - " a.atttypid IN (SELECT oid FROM oids) AND " - " c.relkind IN (" - CppAsString2(RELKIND_RELATION) ", " - CppAsString2(RELKIND_MATVIEW) ", " - CppAsString2(RELKIND_INDEX) ") AND " - " c.relnamespace = n.oid AND " - /* exclude possible orphaned temp tables */ - " n.nspname !~ '^pg_temp_' AND " - " n.nspname !~ '^pg_toast_temp_' AND " - /* exclude system catalogs, too */ - " n.nspname NOT IN ('pg_catalog', 'information_schema')", - cur_check->base_query); - - ntups = PQntuples(res); + Assert(check->version_hook); /* - * The datatype was found, so extract the data and log to the - * requested filename. We need to open the file for appending - * since the check might have already found the type in another - * database earlier in the loop. + * Make sure that the check applies to the current cluster version + * and skip it if not. */ - if (ntups) - { - /* - * Make sure we have a buffer to save reports to now that we - * found a first failing check. - */ - if (!found) - initPQExpBuffer(&report); - found = true; - - /* - * If this is the first time we see an error for the check in - * question then print a status message of the failure. - */ - if (!results[checknum]) - { - pg_log(PG_REPORT, " failed check: %s", _(cur_check->status)); - appendPQExpBuffer(&report, "\n%s\n%s %s\n", - _(cur_check->report_text), - _("A list of the problem columns is in the file:"), - output_path); - } - results[checknum] = true; - - i_nspname = PQfnumber(res, "nspname"); - i_relname = PQfnumber(res, "relname"); - i_attname = PQfnumber(res, "attname"); - - for (int rowno = 0; rowno < ntups; rowno++) - { - if (script == NULL && (script = fopen_priv(output_path, "a")) == NULL) - pg_fatal("could not open file \"%s\": %m", output_path); - - if (!db_used) - { - fprintf(script, "In database: %s\n", active_db->db_name); - db_used = true; - } - fprintf(script, " %s.%s.%s\n", - PQgetvalue(res, rowno, i_nspname), - PQgetvalue(res, rowno, i_relname), - PQgetvalue(res, rowno, i_attname)); - } - - if (script) - { - fclose(script); - script = NULL; - } - } - - PQclear(res); + if (!check->version_hook(cluster)) + continue; } + else if (check->threshold_version != ALL_VERSIONS) + { + if (GET_MAJOR_VERSION(cluster->major_version) > check->threshold_version) + continue; + } + else + Assert(check->threshold_version == ALL_VERSIONS); - PQfinish(conn); + async_task_add_step(task, data_type_check_query, + data_type_check_process, true, + (void *) (intptr_t) i); } + /* + * Connect to each database in the cluster and run all defined checks + * against that database before trying the next one. + */ + async_task_run(task, cluster); + async_task_free(task); + if (found) - pg_fatal("Data type checks failed: %s", report.data); + { + pg_fatal("Data type checks failed: %s", data_type_check_report.data); + termPQExpBuffer(&data_type_check_report); + } - pg_free(results); + pg_free(data_type_check_results); check_ok(); } -- 2.39.3 (Apple Git-146)