From 44b6354429847e3b3aeac21ee5712879b97d7877 Mon Sep 17 00:00:00 2001 From: Peter Eisentraut Date: Sat, 26 Oct 2024 01:11:57 +0200 Subject: [PATCH v0] wait_for_lsn protocol option --- src/backend/tcop/backend_startup.c | 15 +++++++++++++-- src/backend/tcop/dest.c | 12 ++++++++++++ src/backend/tcop/postgres.c | 23 +++++++++++++++++++++++ src/include/libpq/libpq-be.h | 1 + src/interfaces/libpq/fe-connect.c | 26 ++++++++++++++++++++++++++ src/interfaces/libpq/fe-exec.c | 1 + src/interfaces/libpq/fe-protocol3.c | 20 ++++++++++++++++++++ src/interfaces/libpq/fe-trace.c | 2 ++ src/interfaces/libpq/libpq-int.h | 3 +++ 9 files changed, 101 insertions(+), 2 deletions(-) diff --git a/src/backend/tcop/backend_startup.c b/src/backend/tcop/backend_startup.c index 2a96c81f925..bd3b91d01eb 100644 --- a/src/backend/tcop/backend_startup.c +++ b/src/backend/tcop/backend_startup.c @@ -768,12 +768,23 @@ ProcessStartupPacket(Port *port, bool ssl_done, bool gss_done) valptr), errhint("Valid values are: \"false\", 0, \"true\", 1, \"database\"."))); } + else if (strcmp(nameptr, "_pq_.wait_for_lsn") == 0) + { + if (strcmp(valptr, "1") == 0) + port->wait_for_lsn_enabled = true; + else + ereport(FATAL, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid value for parameter \"%s\": \"%s\"", + "wait_for_lsn", + valptr), + errhint("Valid values are: 1."))); + } else if (strncmp(nameptr, "_pq_.", 5) == 0) { /* * Any option beginning with _pq_. is reserved for use as a - * protocol-level option, but at present no such options are - * defined. + * protocol-level option. */ unrecognized_protocol_options = lappend(unrecognized_protocol_options, pstrdup(nameptr)); diff --git a/src/backend/tcop/dest.c b/src/backend/tcop/dest.c index 96f80b30463..bb9910b12d5 100644 --- a/src/backend/tcop/dest.c +++ b/src/backend/tcop/dest.c @@ -31,6 +31,8 @@ #include "access/printsimple.h" #include "access/printtup.h" #include "access/xact.h" +#include "access/xlog.h" +#include "access/xlog_internal.h" #include "commands/copy.h" #include "commands/createas.h" #include "commands/explain.h" @@ -40,6 +42,7 @@ #include "executor/tstoreReceiver.h" #include "libpq/libpq.h" #include "libpq/pqformat.h" +#include "miscadmin.h" /* ---------------- @@ -265,6 +268,15 @@ ReadyForQuery(CommandDest dest) pq_beginmessage(&buf, PqMsg_ReadyForQuery); pq_sendbyte(&buf, TransactionBlockStatusCode()); + if (MyProcPort->wait_for_lsn_enabled) + { + char xloc[MAXFNAMELEN]; + XLogRecPtr logptr; + + logptr = GetXLogWriteRecPtr(); + snprintf(xloc, sizeof(xloc), "%X/%X", LSN_FORMAT_ARGS(logptr)); + pq_sendstring(&buf, xloc); + } pq_endmessage(&buf); } /* Flush output at end of cycle in any case. */ diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 7f5eada9d45..aee6fec1fc4 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -38,6 +38,7 @@ #include "commands/async.h" #include "commands/event_trigger.h" #include "commands/prepare.h" +#include "commands/waitlsn.h" #include "common/pg_prng.h" #include "jit/jit.h" #include "libpq/libpq.h" @@ -75,6 +76,7 @@ #include "utils/injection_point.h" #include "utils/lsyscache.h" #include "utils/memutils.h" +#include "utils/pg_lsn.h" #include "utils/ps_status.h" #include "utils/snapmgr.h" #include "utils/timeout.h" @@ -4782,6 +4784,27 @@ PostgresMain(const char *dbname, const char *username) SetCurrentStatementStartTimestamp(); query_string = pq_getmsgstring(&input_message); + if (MyProcPort && MyProcPort->wait_for_lsn_enabled) + { + const char *wait_for_lsn = pq_getmsgstring(&input_message); + XLogRecPtr lsn; + bool error; + + lsn = pg_lsn_in_internal(wait_for_lsn, &error); + if (error) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("invalid LSN %s", wait_for_lsn))); + if (RecoveryInProgress()) + WaitForLSNReplay(lsn, 0); + else + { + if (GetXLogWriteRecPtr() != lsn) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("LSN mismatch"))); + } + } pq_getmsgend(&input_message); if (am_walsender) diff --git a/src/include/libpq/libpq-be.h b/src/include/libpq/libpq-be.h index 05cb1874c58..0f23a969231 100644 --- a/src/include/libpq/libpq-be.h +++ b/src/include/libpq/libpq-be.h @@ -150,6 +150,7 @@ typedef struct Port */ char *database_name; char *user_name; + bool wait_for_lsn_enabled; char *cmdline_options; List *guc_options; diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c index 64787bea511..aa56ca42b2c 100644 --- a/src/interfaces/libpq/fe-connect.c +++ b/src/interfaces/libpq/fe-connect.c @@ -360,6 +360,10 @@ static const internalPQconninfoOption PQconninfoOptions[] = { "Target-Session-Attrs", "", 15, /* sizeof("prefer-standby") = 15 */ offsetof(struct pg_conn, target_session_attrs)}, + {"wait_for_lsn", "PGWAITFORLSN", "0", NULL, + "Wait-For-LSN", "", 1, + offsetof(struct pg_conn, wait_for_lsn_setting)}, + {"load_balance_hosts", "PGLOADBALANCEHOSTS", DefaultLoadBalanceHosts, NULL, "Load-Balance-Hosts", "", 8, /* sizeof("disable") = 8 */ @@ -1847,6 +1851,28 @@ pqConnectOptions2(PGconn *conn) goto oom_error; } + /* + * validate wait_for_lsn option + */ + if (conn->wait_for_lsn_setting) + { + if (strcmp(conn->wait_for_lsn_setting, "on") == 0 || + strcmp(conn->wait_for_lsn_setting, "true") == 0 || + strcmp(conn->wait_for_lsn_setting, "1") == 0) + conn->wait_for_lsn_enabled = true; + else if (strcmp(conn->wait_for_lsn_setting, "off") == 0 || + strcmp(conn->wait_for_lsn_setting, "false") == 0 || + strcmp(conn->wait_for_lsn_setting, "0") == 0) + conn->wait_for_lsn_enabled = false; + else + { + conn->status = CONNECTION_BAD; + libpq_append_conn_error(conn, "invalid %s value: \"%s\"", + "wait_for_lsn", conn->wait_for_lsn_setting); + return false; + } + } + /* * Only if we get this far is it appropriate to try to connect. (We need a * state flag, rather than just the boolean result of this function, in diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c index 0d224a8524e..8d2d183476b 100644 --- a/src/interfaces/libpq/fe-exec.c +++ b/src/interfaces/libpq/fe-exec.c @@ -1454,6 +1454,7 @@ PQsendQueryInternal(PGconn *conn, const char *query, bool newQuery) /* construct the outgoing Query message */ if (pqPutMsgStart(PqMsg_Query, conn) < 0 || pqPuts(query, conn) < 0 || + (conn->wait_for_lsn_enabled && pqPuts(conn->last_lsn, conn)) || pqPutMsgEnd(conn) < 0) { /* error message should be set up already */ diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c index 8c5ac1729f0..689c869ce56 100644 --- a/src/interfaces/libpq/fe-protocol3.c +++ b/src/interfaces/libpq/fe-protocol3.c @@ -1625,6 +1625,23 @@ getReadyForQuery(PGconn *conn) break; } + if (conn->wait_for_lsn_enabled) + { + PQExpBufferData buf; + + initPQExpBuffer(&buf); + if (pqGets(&buf, conn)) + { + termPQExpBuffer(&buf); + return EOF; + } + else + { + strlcpy(conn->last_lsn, buf.data, sizeof conn->last_lsn); + termPQExpBuffer(&buf); + } + } + return 0; } @@ -2298,6 +2315,9 @@ build_startup_packet(const PGconn *conn, char *packet, if (conn->client_encoding_initial && conn->client_encoding_initial[0]) ADD_STARTUP_OPTION("client_encoding", conn->client_encoding_initial); + if (conn->wait_for_lsn_enabled) + ADD_STARTUP_OPTION("_pq_.wait_for_lsn", "1"); + /* Add any environment-driven GUC settings needed */ for (next_eo = options; next_eo->envName; next_eo++) { diff --git a/src/interfaces/libpq/fe-trace.c b/src/interfaces/libpq/fe-trace.c index 19c5b8a8900..30217b687af 100644 --- a/src/interfaces/libpq/fe-trace.c +++ b/src/interfaces/libpq/fe-trace.c @@ -478,6 +478,7 @@ pqTraceOutput_Query(FILE *f, const char *message, int *cursor) { fprintf(f, "Query\t"); pqTraceOutputString(f, message, cursor, false); + /* FIXME */ } static void @@ -609,6 +610,7 @@ pqTraceOutput_ReadyForQuery(FILE *f, const char *message, int *cursor) { fprintf(f, "ReadyForQuery\t"); pqTraceOutputByte1(f, message, cursor); + /* FIXME */ } /* diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h index 9579f803538..71b9ad24d53 100644 --- a/src/interfaces/libpq/libpq-int.h +++ b/src/interfaces/libpq/libpq-int.h @@ -426,6 +426,7 @@ struct pg_conn char *ssl_max_protocol_version; /* maximum TLS protocol version */ char *target_session_attrs; /* desired session properties */ char *require_auth; /* name of the expected auth method */ + char *wait_for_lsn_setting; char *load_balance_hosts; /* load balance over hosts */ bool cancelRequest; /* true if this connection is used to send a @@ -448,6 +449,7 @@ struct pg_conn ConnStatusType status; PGAsyncStatusType asyncStatus; PGTransactionStatusType xactStatus; /* never changes to ACTIVE */ + char last_lsn[128]; char last_sqlstate[6]; /* last reported SQLSTATE */ bool options_valid; /* true if OK to attempt connection */ bool nonblocking; /* whether this connection is using nonblock @@ -529,6 +531,7 @@ struct pg_conn PGVerbosity verbosity; /* error/notice message verbosity */ PGContextVisibility show_context; /* whether to show CONTEXT field */ PGlobjfuncs *lobjfuncs; /* private state for large-object access fns */ + bool wait_for_lsn_enabled; pg_prng_state prng_state; /* prng state for load balancing connections */ -- 2.47.0