Re: Minimal logical decoding on standbys

From: "Drouvot, Bertrand" <bertranddrouvot(dot)pg(at)gmail(dot)com>
To: Andres Freund <andres(at)anarazel(dot)de>
Cc: Robert Haas <robertmhaas(at)gmail(dot)com>, Jeff Davis <pgsql(at)j-davis(dot)com>, Thomas Munro <thomas(dot)munro(at)gmail(dot)com>, Alvaro Herrera <alvherre(at)2ndquadrant(dot)com>, Ibrar Ahmed <ibrar(dot)ahmad(at)gmail(dot)com>, Amit Khandekar <amitdkhan(dot)pg(at)gmail(dot)com>, fabriziomello(at)gmail(dot)com, tushar <tushar(dot)ahuja(at)enterprisedb(dot)com>, Rahila Syed <rahila(dot)syed(at)2ndquadrant(dot)com>, pgsql-hackers <pgsql-hackers(at)postgresql(dot)org>, Melanie Plageman <melanieplageman(at)gmail(dot)com>
Subject: Re: Minimal logical decoding on standbys
Date: 2023-04-03 15:20:55
Message-ID: 86dccc77-0722-63b9-4c83-fb6928691387@gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Hi,

On 4/2/23 10:10 PM, Andres Freund wrote:
> Hi,
>
> Btw, most of the patches have some things that pgindent will change (and some
> that my editor will highlight). It wouldn't hurt to run pgindent for the later
> patches...

done.

>
> Pushed the WAL format change.
>

Thanks!

>
> On 2023-04-02 10:27:45 +0200, Drouvot, Bertrand wrote:
>> 5.3% doc/src/sgml/
>> 6.2% src/backend/access/transam/
>> 4.6% src/backend/replication/logical/
>> 55.6% src/backend/replication/
>> 4.4% src/backend/storage/ipc/
>> 6.9% src/backend/tcop/
>> 5.3% src/backend/
>> 3.8% src/include/catalog/
>> 5.3% src/include/replication/
>
> I think it might be worth trying to split this up a bit.
>

Okay. Split in 2 parts in V56 enclosed.

One part to handle logical slot conflicts on standby, and one part
to arrange for a new pg_stat_database_conflicts and pg_replication_slots field.

>
>> restart_lsn = s->data.restart_lsn;
>> -
>> - /*
>> - * If the slot is already invalid or is fresh enough, we don't need to
>> - * do anything.
>> - */
>> - if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN)
>> + slot_xmin = s->data.xmin;
>> + slot_catalog_xmin = s->data.catalog_xmin;
>> +
>> + /* the slot has been invalidated (logical decoding conflict case) */
>> + if ((xid && ((LogicalReplicationSlotIsInvalid(s)) ||
>> + /* or the xid is valid and this is a non conflicting slot */
>> + (TransactionIdIsValid(*xid) && !(LogicalReplicationSlotXidsConflict(slot_xmin, slot_catalog_xmin, *xid))))) ||
>> + /* or the slot has been invalidated (obsolete LSN case) */
>> + (!xid && (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN)))
>> {
>
> This still looks nearly unreadable. I suggest moving comments outside of the
> if (), remove redundant parentheses, use a function to detect if the slot has
> been invalidated.
>

I made it as simple as:

/*
* If the slot is already invalid or is a non conflicting slot, we don't
* need to do anything.
*/
islogical = xid ? true : false;

if (SlotIsInvalid(s, islogical) || SlotIsNotConflicting(s, islogical, xid, &oldestLSN))

in V56 attached.

>
>> @@ -1329,16 +1345,45 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
>> */
>> if (last_signaled_pid != active_pid)
>> {
>> + bool send_signal = false;
>> +
>> + initStringInfo(&err_msg);
>> + initStringInfo(&err_detail);
>> +
>> + appendStringInfo(&err_msg, "terminating process %d to release replication slot \"%s\"",
>> + active_pid,
>> + NameStr(slotname));
>
> For this to be translatable you need to use _("message").

Thanks!

>
>
>> + if (xid)
>> + {
>> + appendStringInfo(&err_msg, " because it conflicts with recovery");
>> + send_signal = true;
>> +
>> + if (TransactionIdIsValid(*xid))
>> + appendStringInfo(&err_detail, "The slot conflicted with xid horizon %u.", *xid);
>> + else
>> + appendStringInfo(&err_detail, "Logical decoding on standby requires wal_level to be at least logical on the primary server");
>> + }
>> + else
>> + {
>> + appendStringInfo(&err_detail, "The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.",
>> + LSN_FORMAT_ARGS(restart_lsn),
>> + (unsigned long long) (oldestLSN - restart_lsn));
>> + }
>> +
>> ereport(LOG,
>> - errmsg("terminating process %d to release replication slot \"%s\"",
>> - active_pid, NameStr(slotname)),
>> - errdetail("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.",
>> - LSN_FORMAT_ARGS(restart_lsn),
>> - (unsigned long long) (oldestLSN - restart_lsn)),
>> - errhint("You might need to increase max_slot_wal_keep_size."));
>> -
>> - (void) kill(active_pid, SIGTERM);
>> + errmsg("%s", err_msg.data),
>> + errdetail("%s", err_detail.data),
>> + send_signal ? 0 : errhint("You might need to increase max_slot_wal_keep_size."));
>> +
>> + if (send_signal)
>> + (void) SendProcSignal(active_pid, PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT, InvalidBackendId);
>> + else
>> + (void) kill(active_pid, SIGTERM);
>> +
>> last_signaled_pid = active_pid;
>> +
>> + pfree(err_msg.data);
>> + pfree(err_detail.data);
>> }
>>
>> /* Wait until the slot is released. */
>> @@ -1355,6 +1400,11 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
>> }
>> else
>> {
>> + bool hint = false;;
>> +
>> + initStringInfo(&err_msg);
>> + initStringInfo(&err_detail);
>> +
>> /*
>> * We hold the slot now and have already invalidated it; flush it
>> * to ensure that state persists.
>> @@ -1370,14 +1420,37 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
>> ReplicationSlotMarkDirty();
>> ReplicationSlotSave();
>> ReplicationSlotRelease();
>> + pgstat_drop_replslot(s);
>> +
>> + appendStringInfo(&err_msg, "invalidating");
>> +
>> + if (xid)
>> + {
>> + if (TransactionIdIsValid(*xid))
>> + appendStringInfo(&err_detail, "The slot conflicted with xid horizon %u.", *xid);
>> + else
>> + appendStringInfo(&err_detail, "Logical decoding on standby requires wal_level to be at least logical on the primary server");
>> + }
>> + else
>
> These are nearly the same messags as above. This is too much code to duplicate
> between terminating and invalidating. Put this into a helper or such.
>

ReportTerminationInvalidation() added in V56 for this purpose.

>
>> @@ -3099,6 +3102,31 @@ RecoveryConflictInterrupt(ProcSignalReason reason)
>> /* Intentional fall through to session cancel */
>> /* FALLTHROUGH */
>>
>> + case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT:
>
> The case: above is explicitl falling through. This makes no sense here as far
> as I can tell.

There is an if "reason == PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT" in the
PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT case, so that seems ok to me.

Or are you saying that you'd prefer to see the PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT case somewhere else?
If so, where?

> I thought you did change this in response to my last comment
> about it?
>

Yes.

>> index 8872c80cdf..013cd2b4d0 100644
>> --- a/src/include/replication/slot.h
>> +++ b/src/include/replication/slot.h
>> @@ -17,6 +17,17 @@
>> #include "storage/spin.h"
>> #include "replication/walreceiver.h"
>>
>> +#define ObsoleteSlotIsInvalid(s) (!XLogRecPtrIsInvalid(s->data.invalidated_at) && \
>> + XLogRecPtrIsInvalid(s->data.restart_lsn))
>> +
>> +#define LogicalReplicationSlotIsInvalid(s) (!TransactionIdIsValid(s->data.xmin) && \
>> + !TransactionIdIsValid(s->data.catalog_xmin))
>> +
>> +#define SlotIsInvalid(s) (ObsoleteSlotIsInvalid(s) || LogicalReplicationSlotIsInvalid (s))
>> +
>> +#define LogicalReplicationSlotXidsConflict(slot_xmin, catalog_xmin, xid) \
>> + ((TransactionIdIsValid(slot_xmin) && TransactionIdPrecedesOrEquals(slot_xmin, xid)) || \
>> + (TransactionIdIsValid(slot_catalog_xmin) && TransactionIdPrecedesOrEquals(slot_catalog_xmin, xid)))
>
> Can you make these static inlines instead?
>
>

Done.

>
>
>> diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
>> index 8fe7bb65f1..8457eec4c4 100644
>> --- a/src/backend/replication/logical/decode.c
>> +++ b/src/backend/replication/logical/decode.c
>> @@ -152,11 +152,31 @@ xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
>> * can restart from there.
>> */
>> break;
>> + case XLOG_PARAMETER_CHANGE:
>> + {
>> + xl_parameter_change *xlrec =
>> + (xl_parameter_change *) XLogRecGetData(buf->record);
>> +
>> + /*
>> + * If wal_level on primary is reduced to less than logical, then we
>> + * want to prevent existing logical slots from being used.
>> + * Existing logical slots on standby get invalidated when this WAL
>> + * record is replayed; and further, slot creation fails when the
>> + * wal level is not sufficient; but all these operations are not
>> + * synchronized, so a logical slot may creep in while the wal_level
>> + * is being reduced. Hence this extra check.
>> + */
>> + if (xlrec->wal_level < WAL_LEVEL_LOGICAL)
>> + ereport(ERROR,
>> + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
>> + errmsg("logical decoding on standby requires wal_level "
>> + "to be at least logical on the primary server")));
>
> Please don't break error messages into multiple lines, makes it harder to grep
> for.
>
done.

Regards,

--
Bertrand Drouvot
PostgreSQL Contributors Team
RDS Open Source Databases
Amazon Web Services: https://aws.amazon.com

Attachment Content-Type Size
v56-0006-Doc-changes-describing-details-about-logical-dec.patch text/plain 2.2 KB
v56-0005-New-TAP-test-for-logical-decoding-on-standby.patch text/plain 32.9 KB
v56-0004-Fixing-Walsender-corner-case-with-logical-decodi.patch text/plain 7.8 KB
v56-0003-Allow-logical-decoding-on-standby.patch text/plain 12.0 KB
v56-0002-Arrange-for-a-new-pg_stat_database_conflicts-and.patch text/plain 10.4 KB
v56-0001-Handle-logical-slot-conflicts-on-standby.patch text/plain 28.4 KB

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Alvaro Herrera 2023-04-03 15:34:52 Re: Minimal logical decoding on standbys
Previous Message Masahiko Sawada 2023-04-03 15:15:29 Re: Initial Schema Sync for Logical Replication