From 2b651e53dcde039aec716de05ce70a0a9a69fef6 Mon Sep 17 00:00:00 2001 From: Nathan Bossart Date: Fri, 28 Jun 2024 17:21:19 -0500 Subject: [PATCH v3 2/7] use new pg_upgrade async API for subscription state checks --- src/bin/pg_upgrade/check.c | 200 ++++++++++++++++++++----------------- 1 file changed, 106 insertions(+), 94 deletions(-) diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c index 27924159d6..f653fa25a5 100644 --- a/src/bin/pg_upgrade/check.c +++ b/src/bin/pg_upgrade/check.c @@ -1906,6 +1906,75 @@ check_old_cluster_for_valid_slots(bool live_check) check_ok(); } +/* private state for subscription state checks */ +struct substate_info +{ + FILE *script; + char output_path[MAXPGPATH]; +}; + +/* + * We don't allow upgrade if there is a risk of dangling slot or origin + * corresponding to initial sync after upgrade. + * + * A slot/origin not created yet refers to the 'i' (initialize) state, while + * 'r' (ready) state refers to a slot/origin created previously but already + * dropped. These states are supported for pg_upgrade. The other states listed + * below are not supported: + * + * a) SUBREL_STATE_DATASYNC: A relation upgraded while in this state would + * retain a replication slot, which could not be dropped by the sync worker + * spawned after the upgrade because the subscription ID used for the slot name + * won't match anymore. + * + * b) SUBREL_STATE_SYNCDONE: A relation upgraded while in this state would + * retain the replication origin when there is a failure in tablesync worker + * immediately after dropping the replication slot in the publisher. + * + * c) SUBREL_STATE_FINISHEDCOPY: A tablesync worker spawned to work on a + * relation upgraded while in this state would expect an origin ID with the OID + * of the subscription used before the upgrade, causing it to fail. + * + * d) SUBREL_STATE_SYNCWAIT, SUBREL_STATE_CATCHUP and SUBREL_STATE_UNKNOWN: + * These states are not stored in the catalog, so we need not allow these + * states. + */ +static char * +sub_query(DbInfo *dbinfo, void *arg) +{ + return pg_strdup("SELECT r.srsubstate, s.subname, n.nspname, c.relname " + "FROM pg_catalog.pg_subscription_rel r " + "LEFT JOIN pg_catalog.pg_subscription s" + " ON r.srsubid = s.oid " + "LEFT JOIN pg_catalog.pg_class c" + " ON r.srrelid = c.oid " + "LEFT JOIN pg_catalog.pg_namespace n" + " ON c.relnamespace = n.oid " + "WHERE r.srsubstate NOT IN ('i', 'r') " + "ORDER BY s.subname"); +} + +static void +sub_process(DbInfo *dbinfo, PGresult *res, void *arg) +{ + struct substate_info *state = (struct substate_info *) arg; + int ntup = PQntuples(res); + + for (int i = 0; i < ntup; i++) + { + if (state->script == NULL && + (state->script = fopen_priv(state->output_path, "w")) == NULL) + pg_fatal("could not open file \"%s\": %m", state->output_path); + + fprintf(state->script, "The table sync state \"%s\" is not allowed for database:\"%s\" subscription:\"%s\" schema:\"%s\" relation:\"%s\"\n", + PQgetvalue(res, i, 0), + dbinfo->db_name, + PQgetvalue(res, i, 1), + PQgetvalue(res, i, 2), + PQgetvalue(res, i, 3)); + } +} + /* * check_old_cluster_subscription_state() * @@ -1916,115 +1985,58 @@ check_old_cluster_for_valid_slots(bool live_check) static void check_old_cluster_subscription_state(void) { - FILE *script = NULL; - char output_path[MAXPGPATH]; + AsyncTask *task = async_task_create(); + struct substate_info state; + PGresult *res; + PGconn *conn; int ntup; prep_status("Checking for subscription state"); - snprintf(output_path, sizeof(output_path), "%s/%s", + state.script = NULL; + snprintf(state.output_path, sizeof(state.output_path), "%s/%s", log_opts.basedir, "subs_invalid.txt"); - for (int dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++) - { - PGresult *res; - DbInfo *active_db = &old_cluster.dbarr.dbs[dbnum]; - PGconn *conn = connectToServer(&old_cluster, active_db->db_name); - /* We need to check for pg_replication_origin only once. */ - if (dbnum == 0) - { - /* - * Check that all the subscriptions have their respective - * replication origin. - */ - res = executeQueryOrDie(conn, - "SELECT d.datname, s.subname " - "FROM pg_catalog.pg_subscription s " - "LEFT OUTER JOIN pg_catalog.pg_replication_origin o " - " ON o.roname = 'pg_' || s.oid " - "INNER JOIN pg_catalog.pg_database d " - " ON d.oid = s.subdbid " - "WHERE o.roname IS NULL;"); - - ntup = PQntuples(res); - for (int i = 0; i < ntup; i++) - { - if (script == NULL && (script = fopen_priv(output_path, "w")) == NULL) - pg_fatal("could not open file \"%s\": %m", output_path); - fprintf(script, "The replication origin is missing for database:\"%s\" subscription:\"%s\"\n", - PQgetvalue(res, i, 0), - PQgetvalue(res, i, 1)); - } - PQclear(res); - } - - /* - * We don't allow upgrade if there is a risk of dangling slot or - * origin corresponding to initial sync after upgrade. - * - * A slot/origin not created yet refers to the 'i' (initialize) state, - * while 'r' (ready) state refers to a slot/origin created previously - * but already dropped. These states are supported for pg_upgrade. The - * other states listed below are not supported: - * - * a) SUBREL_STATE_DATASYNC: A relation upgraded while in this state - * would retain a replication slot, which could not be dropped by the - * sync worker spawned after the upgrade because the subscription ID - * used for the slot name won't match anymore. - * - * b) SUBREL_STATE_SYNCDONE: A relation upgraded while in this state - * would retain the replication origin when there is a failure in - * tablesync worker immediately after dropping the replication slot in - * the publisher. - * - * c) SUBREL_STATE_FINISHEDCOPY: A tablesync worker spawned to work on - * a relation upgraded while in this state would expect an origin ID - * with the OID of the subscription used before the upgrade, causing - * it to fail. - * - * d) SUBREL_STATE_SYNCWAIT, SUBREL_STATE_CATCHUP and - * SUBREL_STATE_UNKNOWN: These states are not stored in the catalog, - * so we need not allow these states. - */ - res = executeQueryOrDie(conn, - "SELECT r.srsubstate, s.subname, n.nspname, c.relname " - "FROM pg_catalog.pg_subscription_rel r " - "LEFT JOIN pg_catalog.pg_subscription s" - " ON r.srsubid = s.oid " - "LEFT JOIN pg_catalog.pg_class c" - " ON r.srrelid = c.oid " - "LEFT JOIN pg_catalog.pg_namespace n" - " ON c.relnamespace = n.oid " - "WHERE r.srsubstate NOT IN ('i', 'r') " - "ORDER BY s.subname"); - - ntup = PQntuples(res); - for (int i = 0; i < ntup; i++) - { - if (script == NULL && (script = fopen_priv(output_path, "w")) == NULL) - pg_fatal("could not open file \"%s\": %m", output_path); + /* + * Check that all the subscriptions have their respective replication + * origin. This check only needs to run once. + */ + conn = connectToServer(&old_cluster, old_cluster.dbarr.dbs[0].db_name); + res = executeQueryOrDie(conn, + "SELECT d.datname, s.subname " + "FROM pg_catalog.pg_subscription s " + "LEFT OUTER JOIN pg_catalog.pg_replication_origin o " + " ON o.roname = 'pg_' || s.oid " + "INNER JOIN pg_catalog.pg_database d " + " ON d.oid = s.subdbid " + "WHERE o.roname IS NULL;"); + ntup = PQntuples(res); + for (int i = 0; i < ntup; i++) + { + if (state.script == NULL && + (state.script = fopen_priv(state.output_path, "w")) == NULL) + pg_fatal("could not open file \"%s\": %m", state.output_path); + fprintf(state.script, "The replication origin is missing for database:\"%s\" subscription:\"%s\"\n", + PQgetvalue(res, i, 0), + PQgetvalue(res, i, 1)); + } + PQclear(res); + PQfinish(conn); - fprintf(script, "The table sync state \"%s\" is not allowed for database:\"%s\" subscription:\"%s\" schema:\"%s\" relation:\"%s\"\n", - PQgetvalue(res, i, 0), - active_db->db_name, - PQgetvalue(res, i, 1), - PQgetvalue(res, i, 2), - PQgetvalue(res, i, 3)); - } + async_task_add_step(task, sub_query, sub_process, true, &state); - PQclear(res); - PQfinish(conn); - } + async_task_run(task, &old_cluster); + async_task_free(task); - if (script) + if (state.script) { - fclose(script); + fclose(state.script); pg_log(PG_REPORT, "fatal"); pg_fatal("Your installation contains subscriptions without origin or having relations not in i (initialize) or r (ready) state.\n" "You can allow the initial sync to finish for all relations and then restart the upgrade.\n" "A list of the problematic subscriptions is in the file:\n" - " %s", output_path); + " %s", state.output_path); } else check_ok(); -- 2.39.3 (Apple Git-146)