Re: [PATCH 11/16] Add infrastructure for manipulating multiple streams of wal on a segment handling level

From: Boszormenyi Zoltan <zb(at)cybertec(dot)at>
To: Andres Freund <andres(at)2ndquadrant(dot)com>
Cc: pgsql-hackers(at)postgresql(dot)org
Subject: Re: [PATCH 11/16] Add infrastructure for manipulating multiple streams of wal on a segment handling level
Date: 2012-06-29 12:43:52
Message-ID: 4FEDA308.2080409@cybertec.at
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Hi,

trying to review this one according to http://wiki.postgresql.org/wiki/Reviewing_a_Patch

# Is the patch in context diff format <http://en.wikipedia.org/wiki/Diff#Context_format>?

No. (Does this requirement still apply after PostgreSQL switched to GIT?)

# Does it apply cleanly to the current git master?

No. The patches 01...09 in this series taken from the mailing list apply cleanly,
10 and 11 fail with rejects.

Best regards,
Zoltán Böszörményi

2012-06-13 13:28 keltezéssel, Andres Freund írta:
> From: Andres Freund <andres(at)anarazel(dot)de>
>
> For that add a 'node_id' parameter to most commands dealing with wal
> segments. A node_id thats 'InvalidMultimasterNodeId' references local wal,
> every other node_id referes to wal in a new pg_lcr directory.
>
> Using duplicated code would reduce the impact of that change but the long-term
> code-maintenance burden outweighs that by a far bit.
>
> Besides the decision to add a 'node_id' parameter to several functions the
> changes in this patch are fairly mechanical.
> ---
> src/backend/access/transam/xlog.c | 54 ++++++++++++++++-----------
> src/backend/replication/basebackup.c | 4 +-
> src/backend/replication/walreceiver.c | 2 +-
> src/backend/replication/walsender.c | 9 +++--
> src/bin/initdb/initdb.c | 1 +
> src/bin/pg_resetxlog/pg_resetxlog.c | 2 +-
> src/include/access/xlog.h | 2 +-
> src/include/access/xlog_internal.h | 13 +++++--
> src/include/replication/logical.h | 2 +
> src/include/replication/walsender_private.h | 2 +-
> 10 files changed, 56 insertions(+), 35 deletions(-)
>
> diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
> index 504b4d0..0622726 100644
> --- a/src/backend/access/transam/xlog.c
> +++ b/src/backend/access/transam/xlog.c
> @@ -635,8 +635,8 @@ static bool XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites,
> static bool AdvanceXLInsertBuffer(bool new_segment);
> static bool XLogCheckpointNeeded(uint32 logid, uint32 logseg);
> static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch);
> -static bool InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath,
> - bool find_free, int *max_advance,
> +static bool InstallXLogFileSegment(RepNodeId node_id, uint32 *log, uint32 *seg,
> + char *tmppath, bool find_free, int *max_advance,
> bool use_lock);
> static int XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
> int source, bool notexistOk);
> @@ -1736,8 +1736,8 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
>
> /* create/use new log file */
> use_existent = true;
> - openLogFile = XLogFileInit(openLogId, openLogSeg,
> - &use_existent, true);
> + openLogFile = XLogFileInit(InvalidMultimasterNodeId, openLogId,
> + openLogSeg, &use_existent, true);
> openLogOff = 0;
> }
>
> @@ -2376,6 +2376,9 @@ XLogNeedsFlush(XLogRecPtr record)
> * place. This should be TRUE except during bootstrap log creation. The
> * caller must *not* hold the lock at call.
> *
> + * node_id: if != InvalidMultimasterNodeId this xlog file is actually a LCR
> + * file
> + *
> * Returns FD of opened file.
> *
> * Note: errors here are ERROR not PANIC because we might or might not be
> @@ -2384,8 +2387,8 @@ XLogNeedsFlush(XLogRecPtr record)
> * in a critical section.
> */
> int
> -XLogFileInit(uint32 log, uint32 seg,
> - bool *use_existent, bool use_lock)
> +XLogFileInit(RepNodeId node_id, uint32 log, uint32 seg,
> + bool *use_existent, bool use_lock)
> {
> char path[MAXPGPATH];
> char tmppath[MAXPGPATH];
> @@ -2396,7 +2399,7 @@ XLogFileInit(uint32 log, uint32 seg,
> int fd;
> int nbytes;
>
> - XLogFilePath(path, ThisTimeLineID, log, seg);
> + XLogFilePath(path, ThisTimeLineID, node_id, log, seg);
>
> /*
> * Try to use existent file (checkpoint maker may have created it already)
> @@ -2425,6 +2428,11 @@ XLogFileInit(uint32 log, uint32 seg,
> */
> elog(DEBUG2, "creating and filling new WAL file");
>
> + /*
> + * FIXME: to be safe we need to create tempfile in the pg_lcr directory if
> + * its actually an lcr file because pg_lcr might be in a different
> + * partition.
> + */
> snprintf(tmppath, MAXPGPATH, XLOGDIR "/xlogtemp.%d", (int) getpid());
>
> unlink(tmppath);
> @@ -2493,7 +2501,7 @@ XLogFileInit(uint32 log, uint32 seg,
> installed_log = log;
> installed_seg = seg;
> max_advance = XLOGfileslop;
> - if (!InstallXLogFileSegment(&installed_log, &installed_seg, tmppath,
> + if (!InstallXLogFileSegment(node_id, &installed_log, &installed_seg, tmppath,
> *use_existent, &max_advance,
> use_lock))
> {
> @@ -2548,7 +2556,7 @@ XLogFileCopy(uint32 log, uint32 seg,
> /*
> * Open the source file
> */
> - XLogFilePath(path, srcTLI, srclog, srcseg);
> + XLogFilePath(path, srcTLI, InvalidMultimasterNodeId, srclog, srcseg);
> srcfd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
> if (srcfd < 0)
> ereport(ERROR,
> @@ -2619,7 +2627,8 @@ XLogFileCopy(uint32 log, uint32 seg,
> /*
> * Now move the segment into place with its final name.
> */
> - if (!InstallXLogFileSegment(&log, &seg, tmppath, false, NULL, false))
> + if (!InstallXLogFileSegment(InvalidMultimasterNodeId, &log, &seg, tmppath,
> + false, NULL, false))
> elog(ERROR, "InstallXLogFileSegment should not have failed");
> }
>
> @@ -2653,14 +2662,14 @@ XLogFileCopy(uint32 log, uint32 seg,
> * file into place.
> */
> static bool
> -InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath,
> +InstallXLogFileSegment(RepNodeId node_id, uint32 *log, uint32 *seg, char *tmppath,
> bool find_free, int *max_advance,
> bool use_lock)
> {
> char path[MAXPGPATH];
> struct stat stat_buf;
>
> - XLogFilePath(path, ThisTimeLineID, *log, *seg);
> + XLogFilePath(path, ThisTimeLineID, node_id, *log, *seg);
>
> /*
> * We want to be sure that only one process does this at a time.
> @@ -2687,7 +2696,7 @@ InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath,
> }
> NextLogSeg(*log, *seg);
> (*max_advance)--;
> - XLogFilePath(path, ThisTimeLineID, *log, *seg);
> + XLogFilePath(path, ThisTimeLineID, node_id, *log, *seg);
> }
> }
>
> @@ -2736,7 +2745,7 @@ XLogFileOpen(uint32 log, uint32 seg)
> char path[MAXPGPATH];
> int fd;
>
> - XLogFilePath(path, ThisTimeLineID, log, seg);
> + XLogFilePath(path, ThisTimeLineID, InvalidMultimasterNodeId, log, seg);
>
> fd = BasicOpenFile(path, O_RDWR | PG_BINARY | get_sync_bit(sync_method),
> S_IRUSR | S_IWUSR);
> @@ -2783,7 +2792,7 @@ XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
>
> case XLOG_FROM_PG_XLOG:
> case XLOG_FROM_STREAM:
> - XLogFilePath(path, tli, log, seg);
> + XLogFilePath(path, tli, InvalidMultimasterNodeId, log, seg);
> restoredFromArchive = false;
> break;
>
> @@ -2804,7 +2813,7 @@ XLogFileRead(uint32 log, uint32 seg, int emode, TimeLineID tli,
> bool reload = false;
> struct stat statbuf;
>
> - XLogFilePath(xlogfpath, tli, log, seg);
> + XLogFilePath(xlogfpath, tli, InvalidMultimasterNodeId, log, seg);
> if (stat(xlogfpath, &statbuf) == 0)
> {
> if (unlink(xlogfpath) != 0)
> @@ -2922,7 +2931,7 @@ XLogFileReadAnyTLI(uint32 log, uint32 seg, int emode, int sources)
> }
>
> /* Couldn't find it. For simplicity, complain about front timeline */
> - XLogFilePath(path, recoveryTargetTLI, log, seg);
> + XLogFilePath(path, recoveryTargetTLI, InvalidMultimasterNodeId, log, seg);
> errno = ENOENT;
> ereport(emode,
> (errcode_for_file_access(),
> @@ -3366,7 +3375,8 @@ PreallocXlogFiles(XLogRecPtr endptr)
> {
> NextLogSeg(_logId, _logSeg);
> use_existent = true;
> - lf = XLogFileInit(_logId, _logSeg, &use_existent, true);
> + lf = XLogFileInit(InvalidMultimasterNodeId, _logId, _logSeg,
> + &use_existent, true);
> close(lf);
> if (!use_existent)
> CheckpointStats.ckpt_segs_added++;
> @@ -3486,8 +3496,9 @@ RemoveOldXlogFiles(uint32 log, uint32 seg, XLogRecPtr endptr)
> * separate archive directory.
> */
> if (lstat(path, &statbuf) == 0 && S_ISREG(statbuf.st_mode) &&
> - InstallXLogFileSegment(&endlogId, &endlogSeg, path,
> - true, &max_advance, true))
> + InstallXLogFileSegment(InvalidMultimasterNodeId, &endlogId,
> + &endlogSeg, path, true,
> + &max_advance, true))
> {
> ereport(DEBUG2,
> (errmsg("recycled transaction log file \"%s\"",
> @@ -5255,7 +5266,8 @@ BootStrapXLOG(void)
>
> /* Create first XLOG segment file */
> use_existent = false;
> - openLogFile = XLogFileInit(0, 1, &use_existent, false);
> + openLogFile = XLogFileInit(InvalidMultimasterNodeId, 0, 1,
> + &use_existent, false);
>
> /* Write the first page with the initial record */
> errno = 0;
> diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c
> index 0bc88a4..47e4641 100644
> --- a/src/backend/replication/basebackup.c
> +++ b/src/backend/replication/basebackup.c
> @@ -245,7 +245,7 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
> char fn[MAXPGPATH];
> int i;
>
> - XLogFilePath(fn, ThisTimeLineID, logid, logseg);
> + XLogFilePath(fn, ThisTimeLineID, InvalidMultimasterNodeId, logid, logseg);
> _tarWriteHeader(fn, NULL, &statbuf);
>
> /* Send the actual WAL file contents, block-by-block */
> @@ -264,7 +264,7 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
> * http://lists.apple.com/archives/xcode-users/2003/Dec//msg000
> * 51.html
> */
> - XLogRead(buf, ptr, TAR_SEND_SIZE);
> + XLogRead(buf, InvalidMultimasterNodeId, ptr, TAR_SEND_SIZE);
> if (pq_putmessage('d', buf, TAR_SEND_SIZE))
> ereport(ERROR,
> (errmsg("base backup could not send data, aborting backup")));
> diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
> index 650b74f..e97196b 100644
> --- a/src/backend/replication/walreceiver.c
> +++ b/src/backend/replication/walreceiver.c
> @@ -509,7 +509,7 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
> /* Create/use new log file */
> XLByteToSeg(recptr, recvId, recvSeg);
> use_existent = true;
> - recvFile = XLogFileInit(recvId, recvSeg, &use_existent, true);
> + recvFile = XLogFileInit(InvalidMultimasterNodeId, recvId, recvSeg, &use_existent, true);
> recvOff = 0;
> }
>
> diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
> index e44c734..8cd3a00 100644
> --- a/src/backend/replication/walsender.c
> +++ b/src/backend/replication/walsender.c
> @@ -977,7 +977,7 @@ WalSndKill(int code, Datum arg)
> * more than one.
> */
> void
> -XLogRead(char *buf, XLogRecPtr startptr, Size count)
> +XLogRead(char *buf, RepNodeId node_id, XLogRecPtr startptr, Size count)
> {
> char *p;
> XLogRecPtr recptr;
> @@ -1009,8 +1009,8 @@ retry:
> close(sendFile);
>
> XLByteToSeg(recptr, sendId, sendSeg);
> - XLogFilePath(path, ThisTimeLineID, sendId, sendSeg);
> -
> + XLogFilePath(path, ThisTimeLineID, node_id,
> + sendId, sendSeg);
> sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
> if (sendFile < 0)
> {
> @@ -1215,7 +1215,8 @@ XLogSend(char *msgbuf, bool *caughtup)
> * Read the log directly into the output buffer to avoid extra memcpy
> * calls.
> */
> - XLogRead(msgbuf + 1 + sizeof(WalDataMessageHeader), startptr, nbytes);
> + XLogRead(msgbuf + 1 + sizeof(WalDataMessageHeader), InvalidMultimasterNodeId,
> + startptr, nbytes);
>
> /*
> * We fill the message header last so that the send timestamp is taken as
> diff --git a/src/bin/initdb/initdb.c b/src/bin/initdb/initdb.c
> index 3789948..1f26382 100644
> --- a/src/bin/initdb/initdb.c
> +++ b/src/bin/initdb/initdb.c
> @@ -2637,6 +2637,7 @@ main(int argc, char *argv[])
> "global",
> "pg_xlog",
> "pg_xlog/archive_status",
> + "pg_lcr",
> "pg_clog",
> "pg_notify",
> "pg_serial",
> diff --git a/src/bin/pg_resetxlog/pg_resetxlog.c b/src/bin/pg_resetxlog/pg_resetxlog.c
> index 65ba910..7ee3a3a 100644
> --- a/src/bin/pg_resetxlog/pg_resetxlog.c
> +++ b/src/bin/pg_resetxlog/pg_resetxlog.c
> @@ -973,7 +973,7 @@ WriteEmptyXLOG(void)
>
> /* Write the first page */
> XLogFilePath(path, ControlFile.checkPointCopy.ThisTimeLineID,
> - newXlogId, newXlogSeg);
> + InvalidMultimasterNodeId, newXlogId, newXlogSeg);
>
> unlink(path);
>
> diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
> index dd89cff..3b02c0b 100644
> --- a/src/include/access/xlog.h
> +++ b/src/include/access/xlog.h
> @@ -268,7 +268,7 @@ extern XLogRecPtr XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata);
> extern void XLogFlush(XLogRecPtr RecPtr);
> extern bool XLogBackgroundFlush(void);
> extern bool XLogNeedsFlush(XLogRecPtr RecPtr);
> -extern int XLogFileInit(uint32 log, uint32 seg,
> +extern int XLogFileInit(RepNodeId node_id, uint32 log, uint32 seg,
> bool *use_existent, bool use_lock);
> extern int XLogFileOpen(uint32 log, uint32 seg);
>
> diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h
> index 3328a50..deadddf 100644
> --- a/src/include/access/xlog_internal.h
> +++ b/src/include/access/xlog_internal.h
> @@ -19,6 +19,7 @@
> #include "access/xlog.h"
> #include "fmgr.h"
> #include "pgtime.h"
> +#include "replication/logical.h"
> #include "storage/block.h"
> #include "storage/relfilenode.h"
>
> @@ -216,14 +217,11 @@ typedef XLogLongPageHeaderData *XLogLongPageHeader;
> #define MAXFNAMELEN 64
>
> #define XLogFileName(fname, tli, log, seg) \
> - snprintf(fname, MAXFNAMELEN, "%08X%08X%08X", tli, log, seg)
> + snprintf(fname, MAXFNAMELEN, "%08X%08X%08X", tli, log, seg);
>
> #define XLogFromFileName(fname, tli, log, seg) \
> sscanf(fname, "%08X%08X%08X", tli, log, seg)
>
> -#define XLogFilePath(path, tli, log, seg) \
> - snprintf(path, MAXPGPATH, XLOGDIR "/%08X%08X%08X", tli, log, seg)
> -
> #define TLHistoryFileName(fname, tli) \
> snprintf(fname, MAXFNAMELEN, "%08X.history", tli)
>
> @@ -239,6 +237,13 @@ typedef XLogLongPageHeaderData *XLogLongPageHeader;
> #define BackupHistoryFilePath(path, tli, log, seg, offset) \
> snprintf(path, MAXPGPATH, XLOGDIR "/%08X%08X%08X.%08X.backup", tli, log, seg, offset)
>
> +/* FIXME: move to xlogutils.c, needs to fix sharing with receivexlog.c first though */
> +static inline int XLogFilePath(char* path, TimeLineID tli, RepNodeId node_id, uint32 log, uint32 seg){
> + if(node_id == InvalidMultimasterNodeId)
> + return snprintf(path, MAXPGPATH, XLOGDIR "/%08X%08X%08X", tli, log, seg);
> + else
> + return snprintf(path, MAXPGPATH, LCRDIR "/%d/%08X%08X%08X", node_id, tli, log, seg);
> +}
>
> /*
> * Method table for resource managers.
> diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
> index 0698b61..8f44fad 100644
> --- a/src/include/replication/logical.h
> +++ b/src/include/replication/logical.h
> @@ -19,4 +19,6 @@ extern XLogRecPtr current_replication_origin_lsn;
>
> #define InvalidMultimasterNodeId 0
> #define MaxMultimasterNodeId (2<<3)
> +
> +#define LCRDIR "pg_lcr"
> #endif
> diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h
> index 66234cd..bc58ff4 100644
> --- a/src/include/replication/walsender_private.h
> +++ b/src/include/replication/walsender_private.h
> @@ -95,7 +95,7 @@ extern WalSndCtlData *WalSndCtl;
>
>
> extern void WalSndSetState(WalSndState state);
> -extern void XLogRead(char *buf, XLogRecPtr startptr, Size count);
> +extern void XLogRead(char *buf, RepNodeId node_id, XLogRecPtr startptr, Size count);
>
> /*
> * Internal functions for parsing the replication grammar, in repl_gram.y and

--
----------------------------------
Zoltán Böszörményi
Cybertec Schönig & Schönig GmbH
Gröhrmühlgasse 26
A-2700 Wiener Neustadt, Austria
Web: http://www.postgresql-support.de
http://www.postgresql.at/

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Andres Freund 2012-06-29 12:50:58 Re: [PATCH 10/16] Introduce the concept that wal has a 'origin' node
Previous Message Boszormenyi Zoltan 2012-06-29 12:43:49 Re: [PATCH 10/16] Introduce the concept that wal has a 'origin' node