From 7b3a26bb8e418bdf1920f5fe9fe97afd2939d33a Mon Sep 17 00:00:00 2001 From: Nathan Bossart Date: Wed, 28 Aug 2024 10:45:59 -0500 Subject: [PATCH v12 01/11] Introduce framework for parallelizing various pg_upgrade tasks. A number of pg_upgrade steps require connecting to every database in the cluster and running the same query in each one. When there are many databases, these steps are particularly time-consuming, especially since these steps are performed sequentially in a single process. This commit introduces a new framework that makes it easy to parallelize most of these once-in-each-database tasks. Specifically, it manages a simple state machine of slots and uses libpq's asynchronous APIs to establish the connections and run the queries. The --jobs option is used to determine the number of slots to use. To use this new task framework, callers simply need to provide the query and a callback function to process its results, and the framework takes care of the rest. A more complete description is provided at the top of the new task.c file. None of the eligible once-in-each-database tasks are converted to use this new framework in this commit. That will be done via several follow-up commits. Reviewed-by: Jeff Davis, Robert Haas, Daniel Gustafsson, Ilya Gladyshev, Corey Huinker Discussion: https://postgr.es/m/20240516211638.GA1688936%40nathanxps13 --- doc/src/sgml/ref/pgupgrade.sgml | 6 +- src/bin/pg_upgrade/Makefile | 1 + src/bin/pg_upgrade/meson.build | 1 + src/bin/pg_upgrade/pg_upgrade.h | 21 ++ src/bin/pg_upgrade/task.c | 483 +++++++++++++++++++++++++++++++ src/tools/pgindent/typedefs.list | 5 + 6 files changed, 514 insertions(+), 3 deletions(-) create mode 100644 src/bin/pg_upgrade/task.c diff --git a/doc/src/sgml/ref/pgupgrade.sgml b/doc/src/sgml/ref/pgupgrade.sgml index 9877f2f01c..fc2d0ff845 100644 --- a/doc/src/sgml/ref/pgupgrade.sgml +++ b/doc/src/sgml/ref/pgupgrade.sgml @@ -118,7 +118,7 @@ PostgreSQL documentation - number of simultaneous processes or threads to use + number of simultaneous connections and processes/threads to use @@ -587,8 +587,8 @@ NET STOP postgresql-&majorversion; The option allows multiple CPU cores to be used - for copying/linking of files and to dump and restore database schemas - in parallel; a good place to start is the maximum of the number of + for copying/linking of files, dumping and restoring database schemas + in parallel, etc.; a good place to start is the maximum of the number of CPU cores and tablespaces. This option can dramatically reduce the time to upgrade a multi-database server running on a multiprocessor machine. diff --git a/src/bin/pg_upgrade/Makefile b/src/bin/pg_upgrade/Makefile index bde91e2beb..f83d2b5d30 100644 --- a/src/bin/pg_upgrade/Makefile +++ b/src/bin/pg_upgrade/Makefile @@ -25,6 +25,7 @@ OBJS = \ relfilenumber.o \ server.o \ tablespace.o \ + task.o \ util.o \ version.o diff --git a/src/bin/pg_upgrade/meson.build b/src/bin/pg_upgrade/meson.build index 9825fa3305..3d88419674 100644 --- a/src/bin/pg_upgrade/meson.build +++ b/src/bin/pg_upgrade/meson.build @@ -14,6 +14,7 @@ pg_upgrade_sources = files( 'relfilenumber.c', 'server.c', 'tablespace.c', + 'task.c', 'util.c', 'version.c', ) diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h index cdb6e2b759..53f693c2d4 100644 --- a/src/bin/pg_upgrade/pg_upgrade.h +++ b/src/bin/pg_upgrade/pg_upgrade.h @@ -494,3 +494,24 @@ void parallel_transfer_all_new_dbs(DbInfoArr *old_db_arr, DbInfoArr *new_db_arr char *old_pgdata, char *new_pgdata, char *old_tablespace); bool reap_child(bool wait_for_child); + +/* task.c */ + +typedef void (*UpgradeTaskProcessCB) (DbInfo *dbinfo, PGresult *res, void *arg); + +/* struct definition is private to task.c */ +typedef struct UpgradeTask UpgradeTask; + +UpgradeTask *upgrade_task_create(void); +void upgrade_task_add_step(UpgradeTask *task, const char *query, + UpgradeTaskProcessCB process_cb, bool free_result, + void *arg); +void upgrade_task_run(const UpgradeTask *task, const ClusterInfo *cluster); +void upgrade_task_free(UpgradeTask *task); + +/* convenient type for common private data needed by several tasks */ +typedef struct +{ + FILE *file; + char path[MAXPGPATH]; +} UpgradeTaskReport; diff --git a/src/bin/pg_upgrade/task.c b/src/bin/pg_upgrade/task.c new file mode 100644 index 0000000000..3618dc08ff --- /dev/null +++ b/src/bin/pg_upgrade/task.c @@ -0,0 +1,483 @@ +/* + * task.c + * framework for parallelizing pg_upgrade's once-in-each-database tasks + * + * This framework provides an efficient way of running the various + * once-in-each-database tasks required by pg_upgrade. Specifically, it + * parallelizes these tasks by managing a simple state machine of + * user_opts.jobs slots and using libpq's asynchronous APIs to establish the + * connections and run the queries. Callers simply need to create a callback + * function and build/execute an UpgradeTask. A simple example follows: + * + * static void + * my_process_cb(DbInfo *dbinfo, PGresult *res, void *arg) + * { + * for (int i = 0; i < PQntuples(res); i++) + * { + * ... process results ... + * } + * } + * + * void + * my_task(ClusterInfo *cluster) + * { + * UpgradeTask *task = upgrade_task_create(); + * + * upgrade_task_add_step(task, + * "... query text ...", + * my_process_cb, + * true, // let the task free the PGresult + * NULL); // "arg" pointer for callback + * upgrade_task_run(task, cluster); + * upgrade_task_free(task); + * } + * + * Note that multiple steps can be added to a given task. When there are + * multiple steps, the task will run all of the steps consecutively in the same + * database connection before freeing the connection and moving on. In other + * words, it only ever initiates one connection to each database in the + * cluster for a given run. + * + * Copyright (c) 2024, PostgreSQL Global Development Group + * src/bin/pg_upgrade/task.c + */ + +#include "postgres_fe.h" + +#include "common/connect.h" +#include "fe_utils/string_utils.h" +#include "pg_upgrade.h" + +/* + * dbs_complete stores the number of databases that we have completed + * processing. When this value equals the number of databases in the cluster, + * the task is finished. + */ +static int dbs_complete; + +/* + * dbs_processing stores the index of the next database in the cluster's array + * of databases that will be picked up for processing. It will always be + * greater than or equal to dbs_complete. + */ +static int dbs_processing; + +/* + * This struct stores the information for a single step of a task. Note that + * the query string is stored in the "queries" PQExpBuffer for the UpgradeTask. + * All steps in a task are run in a single connection before moving on to the + * next database (which requires a new connection). + */ +typedef struct UpgradeTaskStep +{ + UpgradeTaskProcessCB process_cb; /* processes the results of the query */ + bool free_result; /* should we free the result? */ + void *arg; /* pointer passed to process_cb */ +} UpgradeTaskStep; + +/* + * This struct is a thin wrapper around an array of steps, i.e., + * UpgradeTaskStep, plus a PQExpBuffer for all the query strings. + */ +typedef struct UpgradeTask +{ + UpgradeTaskStep *steps; + int num_steps; + PQExpBuffer queries; +} UpgradeTask; + +/* + * The different states for a parallel slot. + */ +typedef enum +{ + FREE, /* slot available for use in a new database */ + CONNECTING, /* waiting for connection to be established */ + RUNNING_QUERIES, /* running/processing queries in the task */ +} UpgradeTaskSlotState; + +/* + * We maintain an array of user_opts.jobs slots to execute the task. + */ +typedef struct +{ + UpgradeTaskSlotState state; /* state of the slot */ + int db_idx; /* index of the database assigned to slot */ + int step_idx; /* index of the current step of task */ + PGconn *conn; /* current connection managed by slot */ + bool ready; /* slot is ready for processing */ + bool select_mode; /* select() mode: true->read, false->write */ + int sock; /* file descriptor for connection's socket */ +} UpgradeTaskSlot; + +/* + * Initializes an UpgradeTask. + */ +UpgradeTask * +upgrade_task_create(void) +{ + UpgradeTask *task = pg_malloc0(sizeof(UpgradeTask)); + + task->queries = createPQExpBuffer(); + + /* All tasks must first set a secure search_path. */ + upgrade_task_add_step(task, ALWAYS_SECURE_SEARCH_PATH_SQL, NULL, true, NULL); + + return task; +} + +/* + * Frees all storage associated with an UpgradeTask. + */ +void +upgrade_task_free(UpgradeTask *task) +{ + if (task->steps) + pg_free(task->steps); + + destroyPQExpBuffer(task->queries); + + pg_free(task); +} + +/* + * Adds a step to an UpgradeTask. The steps will be executed in each database + * in the order in which they are added. + * + * task: task object that must have been initialized via upgrade_task_create() + * query: the query text + * process_cb: function that processes the results of the query + * free_result: should we free the PGresult, or leave it to the caller? + * arg: pointer to task-specific data that is passed to each callback + */ +void +upgrade_task_add_step(UpgradeTask *task, const char *query, + UpgradeTaskProcessCB process_cb, bool free_result, + void *arg) +{ + UpgradeTaskStep *new_step; + + task->steps = pg_realloc(task->steps, + ++task->num_steps * sizeof(UpgradeTaskStep)); + + new_step = &task->steps[task->num_steps - 1]; + new_step->process_cb = process_cb; + new_step->free_result = free_result; + new_step->arg = arg; + + appendPQExpBuffer(task->queries, "%s;", query); +} + +/* + * Build a connection string for the slot's current database and asynchronously + * start a new connection, but do not wait for the connection to be + * established. + */ +static void +start_conn(const ClusterInfo *cluster, UpgradeTaskSlot *slot) +{ + PQExpBufferData conn_opts; + DbInfo *dbinfo = &cluster->dbarr.dbs[slot->db_idx]; + + /* Build connection string with proper quoting */ + initPQExpBuffer(&conn_opts); + appendPQExpBufferStr(&conn_opts, "dbname="); + appendConnStrVal(&conn_opts, dbinfo->db_name); + appendPQExpBufferStr(&conn_opts, " user="); + appendConnStrVal(&conn_opts, os_info.user); + appendPQExpBuffer(&conn_opts, " port=%d", cluster->port); + if (cluster->sockdir) + { + appendPQExpBufferStr(&conn_opts, " host="); + appendConnStrVal(&conn_opts, cluster->sockdir); + } + + slot->conn = PQconnectStart(conn_opts.data); + + if (!slot->conn) + pg_fatal("failed to create connection with connection string: \"%s\"", + conn_opts.data); + + termPQExpBuffer(&conn_opts); +} + +/* + * Run the process_cb callback function to process the result of a query, and + * free the result if the caller indicated we should do so. + */ +static void +process_query_result(const ClusterInfo *cluster, UpgradeTaskSlot *slot, + const UpgradeTask *task) +{ + UpgradeTaskStep *steps = &task->steps[slot->step_idx]; + UpgradeTaskProcessCB process_cb = steps->process_cb; + DbInfo *dbinfo = &cluster->dbarr.dbs[slot->db_idx]; + PGresult *res = PQgetResult(slot->conn); + + if (PQstatus(slot->conn) == CONNECTION_BAD || + (PQresultStatus(res) != PGRES_TUPLES_OK && + PQresultStatus(res) != PGRES_COMMAND_OK)) + pg_fatal("connection failure: %s", PQerrorMessage(slot->conn)); + + /* + * We assume that a NULL process_cb callback function means there's + * nothing to process. This is primarily intended for the inital step in + * every task that sets a safe search_path. + */ + if (process_cb) + (*process_cb) (dbinfo, res, steps->arg); + + if (steps->free_result) + PQclear(res); +} + +/* + * Advances the state machine for a given slot as necessary. + */ +static void +process_slot(const ClusterInfo *cluster, UpgradeTaskSlot *slot, const UpgradeTask *task) +{ + if (!slot->ready) + return; + + switch (slot->state) + { + case FREE: + + /* + * If all of the databases in the cluster have been processed or + * are currently being processed by other slots, we are done. + */ + if (dbs_processing >= cluster->dbarr.ndbs) + return; + + /* + * Claim the next database in the cluster's array and initiate a + * new connection. + */ + slot->db_idx = dbs_processing++; + slot->state = CONNECTING; + start_conn(cluster, slot); + + return; + + case CONNECTING: + + /* Check for connection failure. */ + if (PQstatus(slot->conn) == CONNECTION_BAD) + pg_fatal("connection failure: %s", PQerrorMessage(slot->conn)); + + /* Check whether the connection is still establishing. */ + if (PQstatus(slot->conn) != CONNECTION_OK) + return; + + /* + * Move on to running/processing the queries in the task. + */ + slot->state = RUNNING_QUERIES; + if (!PQsendQuery(slot->conn, task->queries->data)) + pg_fatal("connection failure: %s", PQerrorMessage(slot->conn)); + + return; + + case RUNNING_QUERIES: + + /* + * Consume any available data and clear the read-ready indicator + * for the connection. + */ + if (!PQconsumeInput(slot->conn)) + pg_fatal("connection failure: %s", PQerrorMessage(slot->conn)); + + /* + * Process any results that are ready so that we can free up this + * slot for another database as soon as possible. + */ + for (; slot->step_idx < task->num_steps; slot->step_idx++) + { + /* If no more results are available yet, move on. */ + if (PQisBusy(slot->conn)) + return; + + process_query_result(cluster, slot, task); + } + + /* + * If we just finished processing the result of the last step in + * the task, free the slot. We recursively call this function on + * the newly-freed slot so that we can start initiating the next + * connection immediately instead of waiting for the next loop + * through the slots. + */ + dbs_complete++; + PQfinish(slot->conn); + memset(slot, 0, sizeof(UpgradeTaskSlot)); + slot->ready = true; + + process_slot(cluster, slot, task); + + return; + } +} + +/* + * Returns -1 on error, else the number of ready descriptors. + */ +static int +select_loop(int maxFd, fd_set *input, fd_set *output, bool nowait) +{ + fd_set save_input = *input; + fd_set save_output = *output; + struct timeval timeout = {0, 0}; + + if (maxFd == 0) + return 0; + + for (;;) + { + int i; + + *input = save_input; + *output = save_output; + + i = select(maxFd + 1, input, output, NULL, nowait ? &timeout : NULL); + +#ifndef WIN32 + if (i < 0 && errno == EINTR) + continue; +#else + if (i == SOCKET_ERROR && WSAGetLastError() == WSAEINTR) + continue; +#endif + return i; + } +} + +/* + * Wait on the slots to either finish connecting or to receive query results if + * possible. This avoids a tight loop in upgrade_task_run(). + */ +static void +wait_on_slots(UpgradeTaskSlot *slots, int numslots) +{ + fd_set input; + fd_set output; + int maxFd = 0; + bool skip_wait = false; + + FD_ZERO(&input); + FD_ZERO(&output); + + for (int i = 0; i < numslots; i++) + { + PostgresPollingStatusType status; + + switch (slots[i].state) + { + case FREE: + + /* + * This function should only ever see free slots as we are + * finishing processing the last few databases, at which point + * we don't have any databases left for them to process. We'll + * never use these slots again, so we can safely ignore them. + */ + slots[i].ready = false; + continue; + + case CONNECTING: + + /* + * Don't call PQconnectPoll() again for this slot until + * select() tells us something is ready. Be sure to use the + * previous poll mode in this case. + */ + if (!slots[i].ready) + break; + + /* + * If we are waiting for the connection to establish, choose + * whether to wait for reading or for writing on the socket as + * appropriate. If neither apply, mark the slot as ready and + * skip waiting so that it is handled ASAP (we assume this + * means the connection is either bad or fully ready). + */ + status = PQconnectPoll(slots[i].conn); + if (status == PGRES_POLLING_READING) + slots[i].select_mode = true; + else if (status == PGRES_POLLING_WRITING) + slots[i].select_mode = false; + else + { + slots[i].ready = true; + skip_wait = true; + continue; + } + + break; + + case RUNNING_QUERIES: + + /* + * Once we've sent the queries, we must wait for the socket to + * be read-ready. Note that process_slot() handles calling + * PQconsumeInput() as required. + */ + slots[i].select_mode = true; + break; + } + + /* + * Add the socket to the set. + */ + slots[i].ready = false; + slots[i].sock = PQsocket(slots[i].conn); + if (slots[i].sock < 0) + pg_fatal("invalid socket"); + FD_SET(slots[i].sock, slots[i].select_mode ? &input : &output); + maxFd = Max(maxFd, slots[i].sock); + } + + /* + * If we found socket(s) to wait on, wait. + */ + if (select_loop(maxFd, &input, &output, skip_wait) == -1) + pg_fatal("select() failed: %m"); + + /* + * Mark which sockets appear to be ready. + */ + for (int i = 0; i < numslots; i++) + slots[i].ready |= (FD_ISSET(slots[i].sock, &input) || + FD_ISSET(slots[i].sock, &output)); +} + +/* + * Runs all the steps of the task in every database in the cluster using + * user_opts.jobs parallel slots. + */ +void +upgrade_task_run(const UpgradeTask *task, const ClusterInfo *cluster) +{ + int jobs = Max(1, user_opts.jobs); + UpgradeTaskSlot *slots = pg_malloc0(sizeof(UpgradeTaskSlot) * jobs); + + dbs_complete = 0; + dbs_processing = 0; + + /* + * Process every slot the first time round. + */ + for (int i = 0; i < jobs; i++) + slots[i].ready = true; + + while (dbs_complete < cluster->dbarr.ndbs) + { + for (int i = 0; i < jobs; i++) + process_slot(cluster, &slots[i], task); + + wait_on_slots(slots, jobs); + } + + pg_free(slots); +} diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index df3f336bec..725863f9c8 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -3040,6 +3040,11 @@ UnresolvedTup UnresolvedTupData UpdateContext UpdateStmt +UpgradeTask +UpgradeTaskReport +UpgradeTaskSlot +UpgradeTaskSlotState +UpgradeTaskStep UploadManifestCmd UpperRelationKind UpperUniquePath -- 2.39.3 (Apple Git-146)