From: | vignesh C <vignesh21(at)gmail(dot)com> |
---|---|
To: | Peter Smith <smithpb2250(at)gmail(dot)com> |
Cc: | PostgreSQL Hackers <pgsql-hackers(at)postgresql(dot)org> |
Subject: | Re: [HACKERS] logical decoding of two-phase transactions |
Date: | 2021-03-08 05:58:46 |
Message-ID: | CALDaNm29gOsCUtNkvHgqbbD1kbM8m67h4AqfmUWG1oTnfuPFxA@mail.gmail.com |
Views: | Raw Message | Whole Thread | Download mbox | Resend email |
Thread: | |
Lists: | pgsql-hackers |
On Mon, Mar 8, 2021 at 7:17 AM Peter Smith <smithpb2250(at)gmail(dot)com> wrote:
>
> Please find attached the latest patch set v52*
>
Few comments:
+logicalrep_read_begin_prepare(StringInfo in,
LogicalRepBeginPrepareData *begin_data)
+{
+ /* read fields */
+ begin_data->final_lsn = pq_getmsgint64(in);
+ if (begin_data->final_lsn == InvalidXLogRecPtr)
+ elog(ERROR, "final_lsn not set in begin message");
+ begin_data->end_lsn = pq_getmsgint64(in);
+ if (begin_data->end_lsn == InvalidXLogRecPtr)
+ elog(ERROR, "end_lsn not set in begin message");
+ begin_data->committime = pq_getmsgint64(in);
+ begin_data->xid = pq_getmsgint(in, 4);
+
+ /* read gid (copy it into a pre-allocated buffer) */
+ strcpy(begin_data->gid, pq_getmsgstring(in));
+}
In logicalrep_read_begin_prepare we validate final_lsn & end_lsn. But
this validation is not done in logicalrep_read_commit_prepared and
logicalrep_read_rollback_prepared. Should we keep it consistent?
@@ -170,5 +237,4 @@ extern void
logicalrep_write_stream_abort(StringInfo out, TransactionId xid,
TransactionId subxid);
extern void logicalrep_read_stream_abort(StringInfo in, TransactionId *xid,
TransactionId *subxid);
-
#endif /* LOGICAL_PROTO_H */
This change is not required.
@@ -242,15 +244,16 @@ create_replication_slot:
$$ = (Node *) cmd;
}
/* CREATE_REPLICATION_SLOT slot TEMPORARY
LOGICAL plugin */
- | K_CREATE_REPLICATION_SLOT IDENT
opt_temporary K_LOGICAL IDENT create_slot_opt_list
+ | K_CREATE_REPLICATION_SLOT IDENT
opt_temporary opt_two_phase K_LOGICAL IDENT create_slot_opt_list
{
CreateReplicationSlotCmd *cmd;
cmd =
makeNode(CreateReplicationSlotCmd);
cmd->kind = REPLICATION_KIND_LOGICAL;
cmd->slotname = $2;
cmd->temporary = $3;
- cmd->plugin = $5;
- cmd->options = $6;
+ cmd->two_phase = $4;
+ cmd->plugin = $6;
+ cmd->options = $7;
$$ = (Node *) cmd;
}
Should we document two_phase in the below section:
CREATE_REPLICATION_SLOT slot_name [ TEMPORARY ] { PHYSICAL [
RESERVE_WAL ] | LOGICAL output_plugin [ EXPORT_SNAPSHOT |
NOEXPORT_SNAPSHOT | USE_SNAPSHOT ] }
Create a physical or logical replication slot. See Section 27.2.6 for
more about replication slots.
+ while (AnyTablesyncInProgress())
+ {
+ process_syncing_tables(begin_data.final_lsn);
+
+ /* This latch is to prevent 100% CPU looping. */
+ (void) WaitLatch(MyLatch,
+ WL_LATCH_SET
| WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
+ 1000L,
WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
+ ResetLatch(MyLatch);
+ }
Should we have CHECK_FOR_INTERRUPTS inside the while loop?
+ if (begin_data.final_lsn < BiggestTablesyncLSN())
+ {
+ char psfpath[MAXPGPATH];
+
+ /*
+ * Create the spoolfile.
+ */
+ prepare_spoolfile_name(psfpath, sizeof(psfpath),
+
MyLogicalRepWorker->subid, begin_data.gid);
+ prepare_spoolfile_create(psfpath);
We can make this as a single line comment.
+ if (!found)
+ {
+ elog(DEBUG1, "Not found file \"%s\". Create it.", path);
+ psf_cur.vfd = PathNameOpenFile(path, O_RDWR | O_CREAT
| O_TRUNC | PG_BINARY);
+ if (psf_cur.vfd < 0)
+ {
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not create file
\"%s\": %m", path)));
+ }
+ memcpy(psf_cur.name, path, sizeof(psf_cur.name));
+ psf_cur.cur_offset = 0;
+ hentry->allow_delete = true;
+ }
+ else
+ {
+ /*
+ * Open the file and seek to the beginning because we
always want to
+ * create/overwrite this file.
+ */
+ elog(DEBUG1, "Found file \"%s\". Overwrite it.", path);
+ psf_cur.vfd = PathNameOpenFile(path, O_RDWR | O_CREAT
| O_TRUNC | PG_BINARY);
+ if (psf_cur.vfd < 0)
+ {
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not open file
\"%s\": %m", path)));
+ }
+ memcpy(psf_cur.name, path, sizeof(psf_cur.name));
+ psf_cur.cur_offset = 0;
+ hentry->allow_delete = true;
+ }
Except the elog message the rest of the code is the same in both if
and else, we can move the common code outside.
LOGICAL_REP_MSG_TYPE = 'Y',
+ LOGICAL_REP_MSG_BEGIN_PREPARE = 'b',
+ LOGICAL_REP_MSG_PREPARE = 'P',
+ LOGICAL_REP_MSG_COMMIT_PREPARED = 'K',
+ LOGICAL_REP_MSG_ROLLBACK_PREPARED = 'r',
LOGICAL_REP_MSG_STREAM_START = 'S',
LOGICAL_REP_MSG_STREAM_END = 'E',
LOGICAL_REP_MSG_STREAM_COMMIT = 'c',
- LOGICAL_REP_MSG_STREAM_ABORT = 'A'
+ LOGICAL_REP_MSG_STREAM_ABORT = 'A',
+ LOGICAL_REP_MSG_STREAM_PREPARE = 'p'
} LogicalRepMsgType;
As we start adding more and more features, we will have to start
adding more message types, using meaningful characters might become
difficult. Should we start using numeric instead for the new feature
getting added?
Regards.
Vignesh
From | Date | Subject | |
---|---|---|---|
Next Message | Ajin Cherian | 2021-03-08 06:00:45 | Re: [HACKERS] logical decoding of two-phase transactions |
Previous Message | kuroda.hayato@fujitsu.com | 2021-03-08 05:58:04 | RE: [PATCH] pgbench: improve \sleep meta command |