From c148ee632d1fcb2f52c20a97ed0a569b3a20f88d Mon Sep 17 00:00:00 2001 From: Nathan Bossart Date: Fri, 28 Jun 2024 17:21:19 -0500 Subject: [PATCH v12 02/11] Use pg_upgrade's new parallel framework for subscription checks. Reviewed-by: FIXME Discussion: https://postgr.es/m/20240516211638.GA1688936%40nathanxps13 --- src/bin/pg_upgrade/check.c | 206 ++++++++++++++++++++----------------- 1 file changed, 111 insertions(+), 95 deletions(-) diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c index 96adea41e9..f8160e0140 100644 --- a/src/bin/pg_upgrade/check.c +++ b/src/bin/pg_upgrade/check.c @@ -1905,6 +1905,38 @@ check_old_cluster_for_valid_slots(void) check_ok(); } +/* + * Callback function for processing results of query for + * check_old_cluster_subscription_state()'s UpgradeTask. If the query returned + * any rows (i.e., the check failed), write the details to the report file. + */ +static void +process_old_sub_state_check(DbInfo *dbinfo, PGresult *res, void *arg) +{ + UpgradeTaskReport *report = (UpgradeTaskReport *) arg; + int ntup = PQntuples(res); + int i_srsubstate = PQfnumber(res, "srsubstate"); + int i_subname = PQfnumber(res, "subname"); + int i_nspname = PQfnumber(res, "nspname"); + int i_relname = PQfnumber(res, "relname"); + + AssertVariableIsOfType(&process_old_sub_state_check, UpgradeTaskProcessCB); + + for (int i = 0; i < ntup; i++) + { + if (report->file == NULL && + (report->file = fopen_priv(report->path, "w")) == NULL) + pg_fatal("could not open file \"%s\": %m", report->path); + + fprintf(report->file, "The table sync state \"%s\" is not allowed for database:\"%s\" subscription:\"%s\" schema:\"%s\" relation:\"%s\"\n", + PQgetvalue(res, i, i_srsubstate), + dbinfo->db_name, + PQgetvalue(res, i, i_subname), + PQgetvalue(res, i, i_nspname), + PQgetvalue(res, i, i_relname)); + } +} + /* * check_old_cluster_subscription_state() * @@ -1915,115 +1947,99 @@ check_old_cluster_for_valid_slots(void) static void check_old_cluster_subscription_state(void) { - FILE *script = NULL; - char output_path[MAXPGPATH]; + UpgradeTask *task = upgrade_task_create(); + UpgradeTaskReport report; + const char *query; + PGresult *res; + PGconn *conn; int ntup; prep_status("Checking for subscription state"); - snprintf(output_path, sizeof(output_path), "%s/%s", + report.file = NULL; + snprintf(report.path, sizeof(report.path), "%s/%s", log_opts.basedir, "subs_invalid.txt"); - for (int dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++) - { - PGresult *res; - DbInfo *active_db = &old_cluster.dbarr.dbs[dbnum]; - PGconn *conn = connectToServer(&old_cluster, active_db->db_name); - - /* We need to check for pg_replication_origin only once. */ - if (dbnum == 0) - { - /* - * Check that all the subscriptions have their respective - * replication origin. - */ - res = executeQueryOrDie(conn, - "SELECT d.datname, s.subname " - "FROM pg_catalog.pg_subscription s " - "LEFT OUTER JOIN pg_catalog.pg_replication_origin o " - " ON o.roname = 'pg_' || s.oid " - "INNER JOIN pg_catalog.pg_database d " - " ON d.oid = s.subdbid " - "WHERE o.roname IS NULL;"); - - ntup = PQntuples(res); - for (int i = 0; i < ntup; i++) - { - if (script == NULL && (script = fopen_priv(output_path, "w")) == NULL) - pg_fatal("could not open file \"%s\": %m", output_path); - fprintf(script, "The replication origin is missing for database:\"%s\" subscription:\"%s\"\n", - PQgetvalue(res, i, 0), - PQgetvalue(res, i, 1)); - } - PQclear(res); - } - - /* - * We don't allow upgrade if there is a risk of dangling slot or - * origin corresponding to initial sync after upgrade. - * - * A slot/origin not created yet refers to the 'i' (initialize) state, - * while 'r' (ready) state refers to a slot/origin created previously - * but already dropped. These states are supported for pg_upgrade. The - * other states listed below are not supported: - * - * a) SUBREL_STATE_DATASYNC: A relation upgraded while in this state - * would retain a replication slot, which could not be dropped by the - * sync worker spawned after the upgrade because the subscription ID - * used for the slot name won't match anymore. - * - * b) SUBREL_STATE_SYNCDONE: A relation upgraded while in this state - * would retain the replication origin when there is a failure in - * tablesync worker immediately after dropping the replication slot in - * the publisher. - * - * c) SUBREL_STATE_FINISHEDCOPY: A tablesync worker spawned to work on - * a relation upgraded while in this state would expect an origin ID - * with the OID of the subscription used before the upgrade, causing - * it to fail. - * - * d) SUBREL_STATE_SYNCWAIT, SUBREL_STATE_CATCHUP and - * SUBREL_STATE_UNKNOWN: These states are not stored in the catalog, - * so we need not allow these states. - */ - res = executeQueryOrDie(conn, - "SELECT r.srsubstate, s.subname, n.nspname, c.relname " - "FROM pg_catalog.pg_subscription_rel r " - "LEFT JOIN pg_catalog.pg_subscription s" - " ON r.srsubid = s.oid " - "LEFT JOIN pg_catalog.pg_class c" - " ON r.srrelid = c.oid " - "LEFT JOIN pg_catalog.pg_namespace n" - " ON c.relnamespace = n.oid " - "WHERE r.srsubstate NOT IN ('i', 'r') " - "ORDER BY s.subname"); - - ntup = PQntuples(res); - for (int i = 0; i < ntup; i++) - { - if (script == NULL && (script = fopen_priv(output_path, "w")) == NULL) - pg_fatal("could not open file \"%s\": %m", output_path); - - fprintf(script, "The table sync state \"%s\" is not allowed for database:\"%s\" subscription:\"%s\" schema:\"%s\" relation:\"%s\"\n", - PQgetvalue(res, i, 0), - active_db->db_name, - PQgetvalue(res, i, 1), - PQgetvalue(res, i, 2), - PQgetvalue(res, i, 3)); - } - PQclear(res); - PQfinish(conn); + /* + * Check that all the subscriptions have their respective replication + * origin. This check only needs to run once. + */ + conn = connectToServer(&old_cluster, old_cluster.dbarr.dbs[0].db_name); + res = executeQueryOrDie(conn, + "SELECT d.datname, s.subname " + "FROM pg_catalog.pg_subscription s " + "LEFT OUTER JOIN pg_catalog.pg_replication_origin o " + " ON o.roname = 'pg_' || s.oid " + "INNER JOIN pg_catalog.pg_database d " + " ON d.oid = s.subdbid " + "WHERE o.roname IS NULL;"); + ntup = PQntuples(res); + for (int i = 0; i < ntup; i++) + { + if (report.file == NULL && + (report.file = fopen_priv(report.path, "w")) == NULL) + pg_fatal("could not open file \"%s\": %m", report.path); + fprintf(report.file, "The replication origin is missing for database:\"%s\" subscription:\"%s\"\n", + PQgetvalue(res, i, 0), + PQgetvalue(res, i, 1)); } + PQclear(res); + PQfinish(conn); - if (script) + /* + * We don't allow upgrade if there is a risk of dangling slot or origin + * corresponding to initial sync after upgrade. + * + * A slot/origin not created yet refers to the 'i' (initialize) state, + * while 'r' (ready) state refers to a slot/origin created previously but + * already dropped. These states are supported for pg_upgrade. The other + * states listed below are not supported: + * + * a) SUBREL_STATE_DATASYNC: A relation upgraded while in this state would + * retain a replication slot, which could not be dropped by the sync + * worker spawned after the upgrade because the subscription ID used for + * the slot name won't match anymore. + * + * b) SUBREL_STATE_SYNCDONE: A relation upgraded while in this state would + * retain the replication origin when there is a failure in tablesync + * worker immediately after dropping the replication slot in the + * publisher. + * + * c) SUBREL_STATE_FINISHEDCOPY: A tablesync worker spawned to work on a + * relation upgraded while in this state would expect an origin ID with + * the OID of the subscription used before the upgrade, causing it to + * fail. + * + * d) SUBREL_STATE_SYNCWAIT, SUBREL_STATE_CATCHUP and + * SUBREL_STATE_UNKNOWN: These states are not stored in the catalog, so we + * need not allow these states. + */ + query = "SELECT r.srsubstate, s.subname, n.nspname, c.relname " + "FROM pg_catalog.pg_subscription_rel r " + "LEFT JOIN pg_catalog.pg_subscription s" + " ON r.srsubid = s.oid " + "LEFT JOIN pg_catalog.pg_class c" + " ON r.srrelid = c.oid " + "LEFT JOIN pg_catalog.pg_namespace n" + " ON c.relnamespace = n.oid " + "WHERE r.srsubstate NOT IN ('i', 'r') " + "ORDER BY s.subname"; + + upgrade_task_add_step(task, query, process_old_sub_state_check, + true, &report); + + upgrade_task_run(task, &old_cluster); + upgrade_task_free(task); + + if (report.file) { - fclose(script); + fclose(report.file); pg_log(PG_REPORT, "fatal"); pg_fatal("Your installation contains subscriptions without origin or having relations not in i (initialize) or r (ready) state.\n" "You can allow the initial sync to finish for all relations and then restart the upgrade.\n" "A list of the problematic subscriptions is in the file:\n" - " %s", output_path); + " %s", report.path); } else check_ok(); -- 2.39.3 (Apple Git-146)