From 0614958dfdc53c42a5fdc2db234ccf137323d12e Mon Sep 17 00:00:00 2001 From: Peter Eisentraut Date: Thu, 23 Jun 2022 15:25:32 +0200 Subject: [PATCH 4/4] Refactor sending of DataRow messages in replication protocol Some routines open-coded the construction of DataRow messages. Use TupOutputState struct and associated functions instead, which was already done in some places. SendTimeLineHistory() is a bit more complicated and isn't converted by this. --- src/backend/access/common/printsimple.c | 11 ++++ src/backend/replication/basebackup_copy.c | 66 ++++++++--------------- 2 files changed, 33 insertions(+), 44 deletions(-) diff --git a/src/backend/access/common/printsimple.c b/src/backend/access/common/printsimple.c index e99aa279f6..5874026e58 100644 --- a/src/backend/access/common/printsimple.c +++ b/src/backend/access/common/printsimple.c @@ -121,6 +121,17 @@ printsimple(TupleTableSlot *slot, DestReceiver *self) } break; + case OIDOID: + { + Oid num = ObjectIdGetDatum(value); + char str[11]; /* 10 digits and '\0' */ + int len; + + len = pg_lltoa(num, str); // XXX this is enough but should we make a pg_utoa? + pq_sendcountedtext(&buf, str, len, false); + } + break; + default: elog(ERROR, "unsupported type OID: %u", attr->atttypid); } diff --git a/src/backend/replication/basebackup_copy.c b/src/backend/replication/basebackup_copy.c index df0471a7a4..b6e399a5c4 100644 --- a/src/backend/replication/basebackup_copy.c +++ b/src/backend/replication/basebackup_copy.c @@ -27,11 +27,13 @@ #include "access/tupdesc.h" #include "catalog/pg_type_d.h" +#include "executor/executor.h" #include "libpq/libpq.h" #include "libpq/pqformat.h" #include "replication/basebackup.h" #include "replication/basebackup_sink.h" #include "tcop/dest.h" +#include "utils/builtins.h" #include "utils/timestamp.h" typedef struct bbsink_copystream @@ -86,7 +88,6 @@ static void SendCopyOutResponse(void); static void SendCopyDone(void); static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli); static void SendTablespaceList(List *tablespaces); -static void send_int8_string(StringInfoData *buf, int64 intval); static const bbsink_ops bbsink_copystream_ops = { .begin_backup = bbsink_copystream_begin_backup, @@ -339,10 +340,10 @@ static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli) { DestReceiver *dest; + TupOutputState *tstate; TupleDesc tupdesc; - StringInfoData buf; - char str[MAXFNAMELEN]; - Size len; + Datum values[2]; + bool nulls[2] = {0}; dest = CreateDestReceiver(DestRemoteSimple); @@ -355,22 +356,14 @@ SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli) TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "tli", INT8OID, -1, 0); /* send RowDescription */ - dest->rStartup(dest, CMD_SELECT, tupdesc); + tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual); /* Data row */ - pq_beginmessage(&buf, 'D'); - pq_sendint16(&buf, 2); /* number of columns */ - - len = snprintf(str, sizeof(str), - "%X/%X", LSN_FORMAT_ARGS(ptr)); - pq_sendint32(&buf, len); - pq_sendbytes(&buf, str, len); - - len = snprintf(str, sizeof(str), "%u", tli); - pq_sendint32(&buf, len); - pq_sendbytes(&buf, str, len); + values[0]= CStringGetTextDatum(psprintf("%X/%X", LSN_FORMAT_ARGS(ptr))); + values[1] = Int64GetDatum(tli); + do_tup_output(tstate, values, nulls); - pq_endmessage(&buf); + end_tup_output(tstate); /* Send a CommandComplete message */ pq_puttextmessage('C', "SELECT"); @@ -383,6 +376,7 @@ static void SendTablespaceList(List *tablespaces) { DestReceiver *dest; + TupOutputState *tstate; TupleDesc tupdesc; StringInfoData buf; ListCell *lc; @@ -395,51 +389,35 @@ SendTablespaceList(List *tablespaces) TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "size", INT8OID, -1, 0); /* send RowDescription */ - dest->rStartup(dest, CMD_SELECT, tupdesc); + tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual); /* Construct and send the directory information */ foreach(lc, tablespaces) { tablespaceinfo *ti = lfirst(lc); + Datum values[3]; + bool nulls[3] = {0}; /* Send one datarow message */ pq_beginmessage(&buf, 'D'); pq_sendint16(&buf, 3); /* number of columns */ if (ti->path == NULL) { - pq_sendint32(&buf, -1); /* Length = -1 ==> NULL */ - pq_sendint32(&buf, -1); + nulls[0] = true; + nulls[1] = true; } else { - Size len; - - len = strlen(ti->oid); - pq_sendint32(&buf, len); - pq_sendbytes(&buf, ti->oid, len); - - len = strlen(ti->path); - pq_sendint32(&buf, len); - pq_sendbytes(&buf, ti->path, len); + values[0] = ObjectIdGetDatum(ti->oid); + values[1] = CStringGetTextDatum(ti->path); } if (ti->size >= 0) - send_int8_string(&buf, ti->size / 1024); + values[2] = Int64GetDatum(ti->size / 1024); else - pq_sendint32(&buf, -1); /* NULL */ + nulls[2] = true; - pq_endmessage(&buf); + do_tup_output(tstate, values, nulls); } -} - -/* - * Send a 64-bit integer as a string via the wire protocol. - */ -static void -send_int8_string(StringInfoData *buf, int64 intval) -{ - char is[32]; - sprintf(is, INT64_FORMAT, intval); - pq_sendint32(buf, strlen(is)); - pq_sendbytes(buf, is, strlen(is)); + end_tup_output(tstate); } -- 2.36.1