From: | Andres Freund <andres(at)anarazel(dot)de> |
---|---|
To: | "Drouvot, Bertrand" <bertranddrouvot(dot)pg(at)gmail(dot)com> |
Cc: | Robert Haas <robertmhaas(at)gmail(dot)com>, Jeff Davis <pgsql(at)j-davis(dot)com>, Thomas Munro <thomas(dot)munro(at)gmail(dot)com>, Alvaro Herrera <alvherre(at)2ndquadrant(dot)com>, Ibrar Ahmed <ibrar(dot)ahmad(at)gmail(dot)com>, Amit Khandekar <amitdkhan(dot)pg(at)gmail(dot)com>, fabriziomello(at)gmail(dot)com, tushar <tushar(dot)ahuja(at)enterprisedb(dot)com>, Rahila Syed <rahila(dot)syed(at)2ndquadrant(dot)com>, pgsql-hackers <pgsql-hackers(at)postgresql(dot)org>, Melanie Plageman <melanieplageman(at)gmail(dot)com> |
Subject: | Re: Minimal logical decoding on standbys |
Date: | 2023-04-02 20:10:35 |
Message-ID: | 20230402201035.uqkgcsvb7n3u2x3n@awork3.anarazel.de |
Views: | Raw Message | Whole Thread | Download mbox | Resend email |
Thread: | |
Lists: | pgsql-hackers |
Hi,
Btw, most of the patches have some things that pgindent will change (and some
that my editor will highlight). It wouldn't hurt to run pgindent for the later
patches...
Pushed the WAL format change.
On 2023-04-02 10:27:45 +0200, Drouvot, Bertrand wrote:
> During WAL replay on standby, when slot conflict is identified,
> invalidate such slots. Also do the same thing if wal_level on the primary server
> is reduced to below logical and there are existing logical slots
> on standby. Introduce a new ProcSignalReason value for slot
> conflict recovery. Arrange for a new pg_stat_database_conflicts field:
> confl_active_logicalslot.
>
> Add a new field "conflicting" in pg_replication_slots.
>
> Author: Andres Freund (in an older version), Amit Khandekar, Bertrand Drouvot
> Reviewed-By: Bertrand Drouvot, Andres Freund, Robert Haas, Fabrizio de Royes Mello,
> Bharath Rupireddy
> ---
> doc/src/sgml/monitoring.sgml | 11 +
> doc/src/sgml/system-views.sgml | 10 +
> src/backend/access/gist/gistxlog.c | 2 +
> src/backend/access/hash/hash_xlog.c | 1 +
> src/backend/access/heap/heapam.c | 3 +
> src/backend/access/nbtree/nbtxlog.c | 2 +
> src/backend/access/spgist/spgxlog.c | 1 +
> src/backend/access/transam/xlog.c | 20 +-
> src/backend/catalog/system_views.sql | 6 +-
> .../replication/logical/logicalfuncs.c | 13 +-
> src/backend/replication/slot.c | 189 ++++++++++++++----
> src/backend/replication/slotfuncs.c | 16 +-
> src/backend/replication/walsender.c | 7 +
> src/backend/storage/ipc/procsignal.c | 3 +
> src/backend/storage/ipc/standby.c | 13 +-
> src/backend/tcop/postgres.c | 28 +++
> src/backend/utils/activity/pgstat_database.c | 4 +
> src/backend/utils/adt/pgstatfuncs.c | 3 +
> src/include/catalog/pg_proc.dat | 11 +-
> src/include/pgstat.h | 1 +
> src/include/replication/slot.h | 14 +-
> src/include/storage/procsignal.h | 1 +
> src/include/storage/standby.h | 2 +
> src/test/regress/expected/rules.out | 8 +-
> 24 files changed, 308 insertions(+), 61 deletions(-)
> 5.3% doc/src/sgml/
> 6.2% src/backend/access/transam/
> 4.6% src/backend/replication/logical/
> 55.6% src/backend/replication/
> 4.4% src/backend/storage/ipc/
> 6.9% src/backend/tcop/
> 5.3% src/backend/
> 3.8% src/include/catalog/
> 5.3% src/include/replication/
I think it might be worth trying to split this up a bit.
> restart_lsn = s->data.restart_lsn;
> -
> - /*
> - * If the slot is already invalid or is fresh enough, we don't need to
> - * do anything.
> - */
> - if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN)
> + slot_xmin = s->data.xmin;
> + slot_catalog_xmin = s->data.catalog_xmin;
> +
> + /* the slot has been invalidated (logical decoding conflict case) */
> + if ((xid && ((LogicalReplicationSlotIsInvalid(s)) ||
> + /* or the xid is valid and this is a non conflicting slot */
> + (TransactionIdIsValid(*xid) && !(LogicalReplicationSlotXidsConflict(slot_xmin, slot_catalog_xmin, *xid))))) ||
> + /* or the slot has been invalidated (obsolete LSN case) */
> + (!xid && (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN)))
> {
This still looks nearly unreadable. I suggest moving comments outside of the
if (), remove redundant parentheses, use a function to detect if the slot has
been invalidated.
> @@ -1329,16 +1345,45 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
> */
> if (last_signaled_pid != active_pid)
> {
> + bool send_signal = false;
> +
> + initStringInfo(&err_msg);
> + initStringInfo(&err_detail);
> +
> + appendStringInfo(&err_msg, "terminating process %d to release replication slot \"%s\"",
> + active_pid,
> + NameStr(slotname));
For this to be translatable you need to use _("message").
> + if (xid)
> + {
> + appendStringInfo(&err_msg, " because it conflicts with recovery");
> + send_signal = true;
> +
> + if (TransactionIdIsValid(*xid))
> + appendStringInfo(&err_detail, "The slot conflicted with xid horizon %u.", *xid);
> + else
> + appendStringInfo(&err_detail, "Logical decoding on standby requires wal_level to be at least logical on the primary server");
> + }
> + else
> + {
> + appendStringInfo(&err_detail, "The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.",
> + LSN_FORMAT_ARGS(restart_lsn),
> + (unsigned long long) (oldestLSN - restart_lsn));
> + }
> +
> ereport(LOG,
> - errmsg("terminating process %d to release replication slot \"%s\"",
> - active_pid, NameStr(slotname)),
> - errdetail("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.",
> - LSN_FORMAT_ARGS(restart_lsn),
> - (unsigned long long) (oldestLSN - restart_lsn)),
> - errhint("You might need to increase max_slot_wal_keep_size."));
> -
> - (void) kill(active_pid, SIGTERM);
> + errmsg("%s", err_msg.data),
> + errdetail("%s", err_detail.data),
> + send_signal ? 0 : errhint("You might need to increase max_slot_wal_keep_size."));
> +
> + if (send_signal)
> + (void) SendProcSignal(active_pid, PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT, InvalidBackendId);
> + else
> + (void) kill(active_pid, SIGTERM);
> +
> last_signaled_pid = active_pid;
> +
> + pfree(err_msg.data);
> + pfree(err_detail.data);
> }
>
> /* Wait until the slot is released. */
> @@ -1355,6 +1400,11 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
> }
> else
> {
> + bool hint = false;;
> +
> + initStringInfo(&err_msg);
> + initStringInfo(&err_detail);
> +
> /*
> * We hold the slot now and have already invalidated it; flush it
> * to ensure that state persists.
> @@ -1370,14 +1420,37 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
> ReplicationSlotMarkDirty();
> ReplicationSlotSave();
> ReplicationSlotRelease();
> + pgstat_drop_replslot(s);
> +
> + appendStringInfo(&err_msg, "invalidating");
> +
> + if (xid)
> + {
> + if (TransactionIdIsValid(*xid))
> + appendStringInfo(&err_detail, "The slot conflicted with xid horizon %u.", *xid);
> + else
> + appendStringInfo(&err_detail, "Logical decoding on standby requires wal_level to be at least logical on the primary server");
> + }
> + else
These are nearly the same messags as above. This is too much code to duplicate
between terminating and invalidating. Put this into a helper or such.
> @@ -3099,6 +3102,31 @@ RecoveryConflictInterrupt(ProcSignalReason reason)
> /* Intentional fall through to session cancel */
> /* FALLTHROUGH */
>
> + case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
The case: above is explicitl falling through. This makes no sense here as far
as I can tell. I thought you did change this in response to my last comment
about it?
> index 8872c80cdf..013cd2b4d0 100644
> --- a/src/include/replication/slot.h
> +++ b/src/include/replication/slot.h
> @@ -17,6 +17,17 @@
> #include "storage/spin.h"
> #include "replication/walreceiver.h"
>
> +#define ObsoleteSlotIsInvalid(s) (!XLogRecPtrIsInvalid(s->data.invalidated_at) && \
> + XLogRecPtrIsInvalid(s->data.restart_lsn))
> +
> +#define LogicalReplicationSlotIsInvalid(s) (!TransactionIdIsValid(s->data.xmin) && \
> + !TransactionIdIsValid(s->data.catalog_xmin))
> +
> +#define SlotIsInvalid(s) (ObsoleteSlotIsInvalid(s) || LogicalReplicationSlotIsInvalid (s))
> +
> +#define LogicalReplicationSlotXidsConflict(slot_xmin, catalog_xmin, xid) \
> + ((TransactionIdIsValid(slot_xmin) && TransactionIdPrecedesOrEquals(slot_xmin, xid)) || \
> + (TransactionIdIsValid(slot_catalog_xmin) && TransactionIdPrecedesOrEquals(slot_catalog_xmin, xid)))
Can you make these static inlines instead?
> diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
> index 8fe7bb65f1..8457eec4c4 100644
> --- a/src/backend/replication/logical/decode.c
> +++ b/src/backend/replication/logical/decode.c
> @@ -152,11 +152,31 @@ xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
> * can restart from there.
> */
> break;
> + case XLOG_PARAMETER_CHANGE:
> + {
> + xl_parameter_change *xlrec =
> + (xl_parameter_change *) XLogRecGetData(buf->record);
> +
> + /*
> + * If wal_level on primary is reduced to less than logical, then we
> + * want to prevent existing logical slots from being used.
> + * Existing logical slots on standby get invalidated when this WAL
> + * record is replayed; and further, slot creation fails when the
> + * wal level is not sufficient; but all these operations are not
> + * synchronized, so a logical slot may creep in while the wal_level
> + * is being reduced. Hence this extra check.
> + */
> + if (xlrec->wal_level < WAL_LEVEL_LOGICAL)
> + ereport(ERROR,
> + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
> + errmsg("logical decoding on standby requires wal_level "
> + "to be at least logical on the primary server")));
Please don't break error messages into multiple lines, makes it harder to grep
for.
Greetings,
Andres Freund
From | Date | Subject | |
---|---|---|---|
Next Message | Melanie Plageman | 2023-04-02 20:11:47 | Re: Option to not use ringbuffer in VACUUM, using it in failsafe mode |
Previous Message | Jeff Davis | 2023-04-02 20:01:46 | Re: Minimal logical decoding on standbys |