From 0331c1bbb8bee8fa079c761c4a0174212312e709 Mon Sep 17 00:00:00 2001 From: Nathan Bossart Date: Fri, 28 Jun 2024 11:02:44 -0500 Subject: [PATCH v7 01/11] introduce framework for parallelizing pg_upgrade tasks --- src/bin/pg_upgrade/Makefile | 1 + src/bin/pg_upgrade/async.c | 505 +++++++++++++++++++++++++++++++ src/bin/pg_upgrade/meson.build | 1 + src/bin/pg_upgrade/pg_upgrade.h | 16 + src/tools/pgindent/typedefs.list | 4 + 5 files changed, 527 insertions(+) create mode 100644 src/bin/pg_upgrade/async.c diff --git a/src/bin/pg_upgrade/Makefile b/src/bin/pg_upgrade/Makefile index bde91e2beb..3bc4f5d740 100644 --- a/src/bin/pg_upgrade/Makefile +++ b/src/bin/pg_upgrade/Makefile @@ -12,6 +12,7 @@ include $(top_builddir)/src/Makefile.global OBJS = \ $(WIN32RES) \ + async.o \ check.o \ controldata.o \ dump.o \ diff --git a/src/bin/pg_upgrade/async.c b/src/bin/pg_upgrade/async.c new file mode 100644 index 0000000000..b5b36c4725 --- /dev/null +++ b/src/bin/pg_upgrade/async.c @@ -0,0 +1,505 @@ +/* + * async.c + * parallelization via libpq's async APIs + * + * This framework provides an efficient way of running the various + * once-in-each-database tasks required by pg_upgrade. Specifically, it + * parallelizes these tasks by managing a simple state machine of + * user_opts.jobs slots and using libpq's asynchronous APIs to establish the + * connections and run the queries. Callers simply need to create a couple of + * callback functions and build/execute an AsyncTask. A simple example + * follows: + * + * static char * + * my_query_cb(void *arg) + * { + * // NB: RESULT MUST BE ALLOC'D! + * return pg_strdup("... query text ..."); + * } + * + * static void + * my_process_cb(DbInfo *dbinfo, PGresult *res, void *arg) + * { + * for (int i = 0; i < PQntuples(res); i++) + * { + * ... process results ... + * } + * } + * + * void + * my_task(ClusterInfo *cluster) + * { + * AsyncTask *task = async_task_create(); + * + * async_task_add_step(task, + * my_query_cb, + * my_process_cb, + * true, // let the task free the PGresult + * NULL); // "arg" pointer for the callbacks + * async_task_run(task, cluster); + * async_task_free(task); + * } + * + * Note that multiple steps can be added to a given task. When there are + * multiple steps, the task will run all of the steps consecutively in the same + * database connection before freeing the connection and moving on. In other + * words, it only ever initiates one connection to each database in the + * cluster for a given run. + * + * Also note that the query callbacks (i.e., the AsyncTaskGetQueryCB given to + * the task) are only run once, and their result is stored and reused in all + * databases. In practice, this means that the query callbacks cannot depend + * on anything database-specific. If you find yourself trying to use + * database-specific queries, chances are this is the wrong tool for the job. + * + * As shown in the example above, the query callbacks must return an alloc'd + * query string. This string will be freed by async_task_free(). + * + * + * Copyright (c) 2024, PostgreSQL Global Development Group + * src/bin/pg_upgrade/async.c + */ + +#include "postgres_fe.h" + +#include "common/connect.h" +#include "fe_utils/string_utils.h" +#include "pg_upgrade.h" + +/* + * dbs_complete stores the number of databases that we have completed + * processing. When this value equals the number of databases in the cluster, + * the task is finished. + */ +static int dbs_complete; + +/* + * dbs_processing stores the index of the next database in the cluster's array + * of databases that will be picked up for processing. It will always be + * greater than or equal to dbs_complete. + */ +static int dbs_processing; + +/* + * This struct stores all the information for a single step of a task. All + * steps in a task are run in a single connection before moving on to the next + * database (which requires a new connection). Note that the query_cb will + * only be executed once, and its result will be reused for all databases in + * the cluster. + */ +typedef struct AsyncTaskCallbacks +{ + AsyncTaskGetQueryCB query_cb; /* returns an alloc'd query string */ + AsyncTaskProcessCB process_cb; /* processes the results of the query */ + char *query; /* stores the result of query_cb */ + bool free_result; /* should we free the result? */ + void *arg; /* pointer passed to each callback */ +} AsyncTaskCallbacks; + +/* + * This struct is a thin wrapper around an array of steps, i.e., + * AsyncTaskCallbacks. + */ +typedef struct AsyncTask +{ + AsyncTaskCallbacks *cbs; + int num_cb_sets; +} AsyncTask; + +/* + * The different states for a parallel slot. + */ +typedef enum +{ + FREE, /* slot available for use in a new database */ + CONNECTING, /* waiting for connection to be established */ + SETTING_SEARCH_PATH, /* waiting for search_path query to complete */ + RUNNING_QUERY, /* running/processing queries in the task */ +} AsyncSlotState; + +/* + * We maintain an array of user_opts.jobs slots to execute the task. + */ +typedef struct +{ + AsyncSlotState state; /* state of the slot */ + int db; /* index of the database assigned to slot */ + int query; /* index of the current step in the task */ + PGconn *conn; /* current connection managed by slot */ +} AsyncSlot; + +/* + * Initializes an AsyncTask. + */ +AsyncTask * +async_task_create(void) +{ + return pg_malloc0(sizeof(AsyncTask)); +} + +/* + * Frees all storage associated with an AsyncTask. + */ +void +async_task_free(AsyncTask *task) +{ + for (int i = 0; i < task->num_cb_sets; i++) + { + if (task->cbs[i].query) + pg_free(task->cbs[i].query); + } + + if (task->cbs) + pg_free(task->cbs); + + pg_free(task); +} + +/* + * Adds a step to an AsyncTask. The steps will be executed in each database in + * the order in which they are added. + * + * task: task object that must have been initialized via async_task_create() + * query_cb: function that returns an alloc'd query string + * process_cb: function that processes the results of the query + * free_result: should we free the PGresult, or leave it to the caller? + * arg: pointer to task-specific data that is passed to each callback + */ +void +async_task_add_step(AsyncTask *task, + AsyncTaskGetQueryCB query_cb, + AsyncTaskProcessCB process_cb, bool free_result, + void *arg) +{ + AsyncTaskCallbacks *new_cbs; + + task->cbs = pg_realloc(task->cbs, + ++task->num_cb_sets * sizeof(AsyncTaskCallbacks)); + + new_cbs = &task->cbs[task->num_cb_sets - 1]; + new_cbs->query_cb = query_cb; + new_cbs->process_cb = process_cb; + new_cbs->query = NULL; + new_cbs->free_result = free_result; + new_cbs->arg = arg; +} + +/* + * A simple wrapper around a pg_fatal() that includes the error message for the + * connection. + */ +static void +conn_failure(PGconn *conn) +{ + pg_fatal("connection failure: %s", PQerrorMessage(conn)); +} + +/* + * Build a connection string for the slot's current database and asynchronously + * start a new connection, but do not wait for the connection to be + * established. + */ +static void +start_conn(const ClusterInfo *cluster, AsyncSlot *slot) +{ + PQExpBufferData conn_opts; + + /* Build connection string with proper quoting */ + initPQExpBuffer(&conn_opts); + appendPQExpBufferStr(&conn_opts, "dbname="); + appendConnStrVal(&conn_opts, cluster->dbarr.dbs[slot->db].db_name); + appendPQExpBufferStr(&conn_opts, " user="); + appendConnStrVal(&conn_opts, os_info.user); + appendPQExpBuffer(&conn_opts, " port=%d", cluster->port); + if (cluster->sockdir) + { + appendPQExpBufferStr(&conn_opts, " host="); + appendConnStrVal(&conn_opts, cluster->sockdir); + } + + slot->conn = PQconnectStart(conn_opts.data); + termPQExpBuffer(&conn_opts); + + if (!slot->conn) + conn_failure(slot->conn); +} + +/* + * Start the next query, but do not wait for it to complete. + */ +static void +dispatch_query(const ClusterInfo *cluster, AsyncSlot *slot, + const AsyncTask *task) +{ + AsyncTaskCallbacks *cbs = &task->cbs[slot->query]; + + /* + * Note that we store the result of the query callback and reuse it for + * all databases in the cluster, so there cannot be anything + * database-specific in the query string. If your query needs something + * database-specific, chances are that you shouldn't be using the + * AsyncTask functionality. + */ + if (!cbs->query) + cbs->query = cbs->query_cb(cbs->arg); + + if (!PQsendQuery(slot->conn, cbs->query)) + conn_failure(slot->conn); +} + +/* + * Cycle through all the results for a connection and return the last one. We + * don't anticipate there ever being more than a single result for anything we + * do, so this is mostly pro forma. + */ +static PGresult * +get_last_result(PGconn *conn) +{ + PGresult *tmp; + PGresult *res = NULL; + + while ((tmp = PQgetResult(conn)) != NULL) + { + PQclear(res); + res = tmp; + if (PQstatus(conn) == CONNECTION_BAD) + conn_failure(conn); + } + + if (PQresultStatus(res) != PGRES_COMMAND_OK && + PQresultStatus(res) != PGRES_TUPLES_OK) + conn_failure(conn); + + return res; +} + +/* + * Run the process_cb callback function to process the result of a query, and + * free the result if the caller indicated we should do so. + */ +static void +process_query_result(const ClusterInfo *cluster, AsyncSlot *slot, + const AsyncTask *task) +{ + AsyncTaskCallbacks *cbs = &task->cbs[slot->query]; + AsyncTaskProcessCB process_cb = cbs->process_cb; + DbInfo *dbinfo = &cluster->dbarr.dbs[slot->db]; + PGresult *res = get_last_result(slot->conn); + + (*process_cb) (dbinfo, res, cbs->arg); + + if (cbs->free_result) + PQclear(res); +} + +/* + * Advances the state machine for a given slot as necessary. + */ +static void +process_slot(const ClusterInfo *cluster, AsyncSlot *slot, const AsyncTask *task) +{ + switch (slot->state) + { + case FREE: + + /* + * If all of the databases in the cluster have been processed or + * are currently being processed by other slots, we are done. + */ + if (dbs_processing >= cluster->dbarr.ndbs) + return; + + /* + * Claim the next database in the cluster's array and initiate a + * new connection. + */ + slot->db = dbs_processing++; + slot->state = CONNECTING; + start_conn(cluster, slot); + + return; + + case CONNECTING: + + /* Check for connection failure. */ + if (PQconnectPoll(slot->conn) == PGRES_POLLING_FAILED) + conn_failure(slot->conn); + + /* Check whether the connection is still establishing. */ + if (PQconnectPoll(slot->conn) != PGRES_POLLING_OK) + return; + + /* + * Move on to setting the search_path for the connection to a + * known-safe value. This is common for all tasks/steps and + * should always be done first. + */ + slot->state = SETTING_SEARCH_PATH; + if (!PQsendQuery(slot->conn, ALWAYS_SECURE_SEARCH_PATH_SQL)) + conn_failure(slot->conn); + + return; + + case SETTING_SEARCH_PATH: + + /* Check whether the query is still in progress. */ + if (!PQconsumeInput(slot->conn)) + conn_failure(slot->conn); + if (PQisBusy(slot->conn)) + return; + + /* Discard the result of the search_path query. */ + PQclear(get_last_result(slot->conn)); + + /* Start running the query for the first step in the task. */ + slot->state = RUNNING_QUERY; + dispatch_query(cluster, slot, task); + + return; + + case RUNNING_QUERY: + + /* Check whether the query is still in progress. */ + if (!PQconsumeInput(slot->conn)) + conn_failure(slot->conn); + if (PQisBusy(slot->conn)) + return; + + /* Process the query result. */ + process_query_result(cluster, slot, task); + + /* + * If we just finished processing the result of the last step in + * the task, free the slot. We recursively call this function on + * the newly-freed slot so that we can start initiating the next + * connection immediately instead of waiting for the next loop + * through the slots. + */ + if (++slot->query >= task->num_cb_sets) + { + dbs_complete++; + PQfinish(slot->conn); + memset(slot, 0, sizeof(AsyncSlot)); + + process_slot(cluster, slot, task); + + return; + } + + /* Start running the query for the next step in the task. */ + dispatch_query(cluster, slot, task); + return; + } +} + +/* + * Wait on the slots to either finish connecting or to receive query results if + * possible. This avoids a tight loop in async_task_run(). + */ +static void +wait_on_slots(AsyncSlot *slots, int numslots) +{ + fd_set input_mask; + fd_set output_mask; + fd_set except_mask; + int maxFd = 0; + + FD_ZERO(&input_mask); + FD_ZERO(&output_mask); + FD_ZERO(&except_mask); + + for (int i = 0; i < numslots; i++) + { + int sock; + bool read = false; + + switch (slots[i].state) + { + case FREE: + + /* + * This function should only ever see free slots as we are + * finishing processing the last few databases, at which point + * we don't have any databases left for them to process. We'll + * never use these slots again, so we can safely ignore them. + */ + continue; + + case CONNECTING: + + /* + * If we are waiting for the connection to establish, choose + * whether to wait for reading or for writing on the socket as + * appropriate. If neither apply, just return immediately so + * that we can handle the slot. + */ + { + PostgresPollingStatusType status; + + status = PQconnectPoll(slots[i].conn); + if (status == PGRES_POLLING_READING) + read = true; + else if (status != PGRES_POLLING_WRITING) + return; + } + break; + + case SETTING_SEARCH_PATH: + case RUNNING_QUERY: + + /* + * If we've sent a query, we must wait for the socket to be + * read-ready. Note that process_slot() handles calling + * PQconsumeInput() as required. + */ + read = true; + break; + } + + /* + * If there's some problem retrieving the socket, just pretend this + * slot doesn't exist. We don't expect this to happen regularly in + * practice, so it seems unlikely to cause too much harm. + */ + sock = PQsocket(slots[i].conn); + if (sock < 0) + continue; + + /* + * Add the socket to the set. + */ + FD_SET(sock, read ? &input_mask : &output_mask); + FD_SET(sock, &except_mask); + maxFd = Max(maxFd, sock); + } + + /* + * If we found socket(s) to wait on, wait. + */ + if (maxFd != 0) + (void) select(maxFd + 1, &input_mask, &output_mask, &except_mask, NULL); +} + +/* + * Runs all the steps of the task in every database in the cluster using + * user_opts.jobs parallel slots. + */ +void +async_task_run(const AsyncTask *task, const ClusterInfo *cluster) +{ + int jobs = Max(1, user_opts.jobs); + AsyncSlot *slots = pg_malloc0(sizeof(AsyncSlot) * jobs); + + dbs_complete = 0; + dbs_processing = 0; + + while (dbs_complete < cluster->dbarr.ndbs) + { + for (int i = 0; i < jobs; i++) + process_slot(cluster, &slots[i], task); + + wait_on_slots(slots, jobs); + } + + pg_free(slots); +} diff --git a/src/bin/pg_upgrade/meson.build b/src/bin/pg_upgrade/meson.build index 9825fa3305..9eb48e176c 100644 --- a/src/bin/pg_upgrade/meson.build +++ b/src/bin/pg_upgrade/meson.build @@ -1,6 +1,7 @@ # Copyright (c) 2022-2024, PostgreSQL Global Development Group pg_upgrade_sources = files( + 'async.c', 'check.c', 'controldata.c', 'dump.c', diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h index cdb6e2b759..104ae75f9f 100644 --- a/src/bin/pg_upgrade/pg_upgrade.h +++ b/src/bin/pg_upgrade/pg_upgrade.h @@ -494,3 +494,19 @@ void parallel_transfer_all_new_dbs(DbInfoArr *old_db_arr, DbInfoArr *new_db_arr char *old_pgdata, char *new_pgdata, char *old_tablespace); bool reap_child(bool wait_for_child); + +/* async.c */ + +typedef char *(*AsyncTaskGetQueryCB) (void *arg); +typedef void (*AsyncTaskProcessCB) (DbInfo *dbinfo, PGresult *res, void *arg); + +/* struct definition is private to async.c */ +typedef struct AsyncTask AsyncTask; + +AsyncTask *async_task_create(void); +void async_task_add_step(AsyncTask *task, + AsyncTaskGetQueryCB query_cb, + AsyncTaskProcessCB process_cb, bool free_result, + void *arg); +void async_task_run(const AsyncTask *task, const ClusterInfo *cluster); +void async_task_free(AsyncTask *task); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 8de9978ad8..2c43a7adcc 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -153,6 +153,10 @@ ArrayMetaState ArraySubWorkspace ArrayToken ArrayType +AsyncSlot +AsyncSlotState +AsyncTask +AsyncTaskCallbacks AsyncQueueControl AsyncQueueEntry AsyncRequest -- 2.39.3 (Apple Git-146)