From 9f30600e54e79a6d1e0ca4feda1511d90092148a Mon Sep 17 00:00:00 2001 From: Peter Eisentraut Date: Thu, 23 Jun 2022 10:55:09 +0200 Subject: [PATCH 3/4] Refactor sending of RowDescription messages in replication protocol Some routines open-coded the construction of RowDescription messages. Instead, we have support for doing this using tuple descriptors and DestRemoteSimple, so use that instead. --- src/backend/access/common/tupdesc.c | 9 +++ src/backend/replication/basebackup_copy.c | 74 +++++++---------------- src/backend/replication/walsender.c | 29 +++------ 3 files changed, 40 insertions(+), 72 deletions(-) diff --git a/src/backend/access/common/tupdesc.c b/src/backend/access/common/tupdesc.c index 9f41b1e854..d6fb261e20 100644 --- a/src/backend/access/common/tupdesc.c +++ b/src/backend/access/common/tupdesc.c @@ -739,6 +739,15 @@ TupleDescInitBuiltinEntry(TupleDesc desc, att->attcollation = InvalidOid; break; + case OIDOID: + att->attlen = 4; + att->attbyval = true; + att->attalign = TYPALIGN_INT; + att->attstorage = TYPSTORAGE_PLAIN; + att->attcompression = InvalidCompressionMethod; + att->attcollation = InvalidOid; + break; + default: elog(ERROR, "unsupported type %u", oidtypeid); } diff --git a/src/backend/replication/basebackup_copy.c b/src/backend/replication/basebackup_copy.c index 1eed9d8c3f..df0471a7a4 100644 --- a/src/backend/replication/basebackup_copy.c +++ b/src/backend/replication/basebackup_copy.c @@ -25,11 +25,13 @@ */ #include "postgres.h" +#include "access/tupdesc.h" #include "catalog/pg_type_d.h" #include "libpq/libpq.h" #include "libpq/pqformat.h" #include "replication/basebackup.h" #include "replication/basebackup_sink.h" +#include "tcop/dest.h" #include "utils/timestamp.h" typedef struct bbsink_copystream @@ -336,35 +338,24 @@ SendCopyDone(void) static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli) { + DestReceiver *dest; + TupleDesc tupdesc; StringInfoData buf; char str[MAXFNAMELEN]; Size len; - pq_beginmessage(&buf, 'T'); /* RowDescription */ - pq_sendint16(&buf, 2); /* 2 fields */ - - /* Field headers */ - pq_sendstring(&buf, "recptr"); - pq_sendint32(&buf, 0); /* table oid */ - pq_sendint16(&buf, 0); /* attnum */ - pq_sendint32(&buf, TEXTOID); /* type oid */ - pq_sendint16(&buf, -1); - pq_sendint32(&buf, 0); - pq_sendint16(&buf, 0); - - pq_sendstring(&buf, "tli"); - pq_sendint32(&buf, 0); /* table oid */ - pq_sendint16(&buf, 0); /* attnum */ + dest = CreateDestReceiver(DestRemoteSimple); + tupdesc = CreateTemplateTupleDesc(2); + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "recptr", TEXTOID, -1, 0); /* * int8 may seem like a surprising data type for this, but in theory int4 * would not be wide enough for this, as TimeLineID is unsigned. */ - pq_sendint32(&buf, INT8OID); /* type oid */ - pq_sendint16(&buf, 8); - pq_sendint32(&buf, 0); - pq_sendint16(&buf, 0); - pq_endmessage(&buf); + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "tli", INT8OID, -1, 0); + + /* send RowDescription */ + dest->rStartup(dest, CMD_SELECT, tupdesc); /* Data row */ pq_beginmessage(&buf, 'D'); @@ -391,41 +382,22 @@ SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli) static void SendTablespaceList(List *tablespaces) { + DestReceiver *dest; + TupleDesc tupdesc; StringInfoData buf; ListCell *lc; - /* Construct and send the directory information */ - pq_beginmessage(&buf, 'T'); /* RowDescription */ - pq_sendint16(&buf, 3); /* 3 fields */ - - /* First field - spcoid */ - pq_sendstring(&buf, "spcoid"); - pq_sendint32(&buf, 0); /* table oid */ - pq_sendint16(&buf, 0); /* attnum */ - pq_sendint32(&buf, OIDOID); /* type oid */ - pq_sendint16(&buf, 4); /* typlen */ - pq_sendint32(&buf, 0); /* typmod */ - pq_sendint16(&buf, 0); /* format code */ - - /* Second field - spclocation */ - pq_sendstring(&buf, "spclocation"); - pq_sendint32(&buf, 0); - pq_sendint16(&buf, 0); - pq_sendint32(&buf, TEXTOID); - pq_sendint16(&buf, -1); - pq_sendint32(&buf, 0); - pq_sendint16(&buf, 0); - - /* Third field - size */ - pq_sendstring(&buf, "size"); - pq_sendint32(&buf, 0); - pq_sendint16(&buf, 0); - pq_sendint32(&buf, INT8OID); - pq_sendint16(&buf, 8); - pq_sendint32(&buf, 0); - pq_sendint16(&buf, 0); - pq_endmessage(&buf); + dest = CreateDestReceiver(DestRemoteSimple); + + tupdesc = CreateTemplateTupleDesc(3); + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "spcoid", OIDOID, -1, 0); + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "spclocation", TEXTOID, -1, 0); + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "size", INT8OID, -1, 0); + /* send RowDescription */ + dest->rStartup(dest, CMD_SELECT, tupdesc); + + /* Construct and send the directory information */ foreach(lc, tablespaces) { tablespaceinfo *ti = lfirst(lc); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index fa60c92e13..2c9e190685 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -579,6 +579,8 @@ ReadReplicationSlot(ReadReplicationSlotCmd *cmd) static void SendTimeLineHistory(TimeLineHistoryCmd *cmd) { + DestReceiver *dest; + TupleDesc tupdesc; StringInfoData buf; char histfname[MAXFNAMELEN]; char path[MAXPGPATH]; @@ -587,36 +589,21 @@ SendTimeLineHistory(TimeLineHistoryCmd *cmd) off_t bytesleft; Size len; + dest = CreateDestReceiver(DestRemoteSimple); + /* * Reply with a result set with one row, and two columns. The first col is * the name of the history file, 2nd is the contents. */ + tupdesc = CreateTemplateTupleDesc(2); + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "filename", TEXTOID, -1, 0); + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "content", TEXTOID, -1, 0); TLHistoryFileName(histfname, cmd->timeline); TLHistoryFilePath(path, cmd->timeline); /* Send a RowDescription message */ - pq_beginmessage(&buf, 'T'); - pq_sendint16(&buf, 2); /* 2 fields */ - - /* first field */ - pq_sendstring(&buf, "filename"); /* col name */ - pq_sendint32(&buf, 0); /* table oid */ - pq_sendint16(&buf, 0); /* attnum */ - pq_sendint32(&buf, TEXTOID); /* type oid */ - pq_sendint16(&buf, -1); /* typlen */ - pq_sendint32(&buf, 0); /* typmod */ - pq_sendint16(&buf, 0); /* format code */ - - /* second field */ - pq_sendstring(&buf, "content"); /* col name */ - pq_sendint32(&buf, 0); /* table oid */ - pq_sendint16(&buf, 0); /* attnum */ - pq_sendint32(&buf, TEXTOID); /* type oid */ - pq_sendint16(&buf, -1); /* typlen */ - pq_sendint32(&buf, 0); /* typmod */ - pq_sendint16(&buf, 0); /* format code */ - pq_endmessage(&buf); + dest->rStartup(dest, CMD_SELECT, tupdesc); /* Send a DataRow message */ pq_beginmessage(&buf, 'D'); -- 2.36.1