From: | Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com> |
---|---|
To: | Amit Kapila <amit(dot)kapila16(at)gmail(dot)com> |
Cc: | vignesh C <vignesh21(at)gmail(dot)com>, Greg Nancarrow <gregn4422(at)gmail(dot)com>, Peter Eisentraut <peter(dot)eisentraut(at)enterprisedb(dot)com>, "houzj(dot)fnst(at)fujitsu(dot)com" <houzj(dot)fnst(at)fujitsu(dot)com>, "tanghy(dot)fnst(at)fujitsu(dot)com" <tanghy(dot)fnst(at)fujitsu(dot)com>, "osumi(dot)takamichi(at)fujitsu(dot)com" <osumi(dot)takamichi(at)fujitsu(dot)com>, Alexey Lesovsky <lesovsky(at)gmail(dot)com>, PostgreSQL-development <pgsql-hackers(at)postgresql(dot)org> |
Subject: | Re: Skipping logical replication transactions on subscriber side |
Date: | 2022-01-17 04:18:49 |
Message-ID: | CAD21AoCd3Y2-b67+pVrzrdteUmup1XG6JeHYOa5dGjh8qZ3VuQ@mail.gmail.com |
Views: | Raw Message | Whole Thread | Download mbox | Resend email |
Thread: | |
Lists: | pgsql-hackers |
On Sat, Jan 15, 2022 at 7:24 PM Amit Kapila <amit(dot)kapila16(at)gmail(dot)com> wrote:
>
> On Fri, Jan 14, 2022 at 7:49 AM Masahiko Sawada <sawada(dot)mshk(at)gmail(dot)com> wrote:
> >
> > I agree with all the comments above. I've attached an updated patch.
> >
>
> Review comments
> ================
Thank you for the comments!
> 1.
> +
> + <para>
> + In this case, you need to consider changing the data on the
> subscriber so that it
>
> The starting of this sentence doesn't make sense to me. How about
> changing it like: "To resolve conflicts, you need to ...
>
Fixed.
> 2.
> + <structname>pg_subscription</structname>.<structfield>subskipxid</structfield>)
> + is cleared. See <xref linkend="logical-replication-conflicts"/> for
> + the details of logical replication conflicts.
> + </para>
> +
> + <para>
> + <replaceable>skip_option</replaceable> specifies options for
> this operation.
> + The supported option is:
> +
> + <variablelist>
> + <varlistentry>
> + <term><literal>xid</literal> (<type>xid</type>)</term>
> + <listitem>
> + <para>
> + Specifies the ID of the transaction whose changes are to be skipped
> + by the logical replication worker. Setting
> <literal>NONE</literal> resets
> + the transaction ID.
> + </para>
>
> Empty spaces after line finish are inconsistent. I personally use a
> single space before a new line but I see that others use two spaces
> and the nearby documentation also uses two spaces in this regard so I
> am fine either way but let's be consistent.
Fixed.
>
> 3.
> + case ALTER_SUBSCRIPTION_SKIP:
> + {
> + if (!superuser())
> + ereport(ERROR,
> + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
> + errmsg("must be superuser to skip transaction")));
> +
> + parse_subscription_options(pstate, stmt->options, SUBOPT_XID, &opts);
> +
> + if (IsSet(opts.specified_opts, SUBOPT_XID))
> ..
> ..
>
> Is there a case when the above 'if (IsSet(..' won't be true? If not,
> then probably there should be Assert instead of 'if'.
>
Fixed.
> 4.
> +static TransactionId skipping_xid = InvalidTransactionId;
>
> I find this variable name bit odd. Can we name it skip_xid?
>
Okay, renamed.
> 5.
> + * skipping_xid is valid if we are skipping all data modification changes
> + * (INSERT, UPDATE, etc.) of the specified transaction at
> MySubscription->skipxid.
> + * Once we start skipping changes, we don't stop it until we skip all changes
>
> I think it would be better to write the first line of comment as: "We
> enable skipping all data modification changes (INSERT, UPDATE, etc.)
> for the subscription if the user has specified skip_xid. Once we ..."
>
Changed.
> 6.
> +static void
> +maybe_start_skipping_changes(TransactionId xid)
> +{
> + Assert(!is_skipping_changes());
> + Assert(!in_remote_transaction);
> + Assert(!in_streamed_transaction);
> +
> + /* Make sure subscription cache is up-to-date */
> + maybe_reread_subscription();
>
> Why do we need to update the cache here by calling
> maybe_reread_subscription() and at other places in the patch? It is
> sufficient to get the skip_xid value at the start of the worker via
> GetSubscription().
MySubscription could be out-of-date after a user changes the catalog.
In non-skipping change cases, we check it when starting the
transaction in begin_replication_step() which is called, e.g., when
applying an insert change. But I think we need to make sure it’s
up-to-date at the beginning of applying changes, that is, before
starting a transaction. Otherwise, we may end up skipping the
transaction based on out-of-dated subscription cache.
The reason why calling calling maybe_reread_subscription in both
apply_handle_commit_prepared() and apply_handle_rollback_prepared() is
the same; MySubscription could be out-of-date when applying
commit-prepared or rollback-prepared since we have not called
begin_replication_step() to open a new transaction.
>
> 7. In maybe_reread_subscription(), isn't there a need to check whether
> skip_xid is changed where we exit and launch the worker and compare
> other subscription parameters?
IIUC we relaunch the worker here when subscription parameters such as
slot_name was changed. In the current implementation, I think that
relaunching the worker is not necessarily necessary when skip_xid is
changed. For instance, when skipping the prepared transaction, we
deliberately don’t clear subskipxid of pg_subscription and do that at
commit-prepared or rollback-prepared case. There are chances that the
user changes skip_xid before commit-prepared or rollback-prepared. But
we tolerate this case.
Also, in non-streaming and non-2PC cases, while skipping changes we
don’t call maybe_reread_subscription() until all changes are skipped.
So it cannot work to cancel skipping changes that is already started.
>
> 8.
> +static void
> +clear_subscription_skip_xid(TransactionId xid, XLogRecPtr origin_lsn,
> + TimestampTz origin_timestamp)
> +{
> + Relation rel;
> + Form_pg_subscription subform;
> + HeapTuple tup;
> + bool nulls[Natts_pg_subscription];
> + bool replaces[Natts_pg_subscription];
> + Datum values[Natts_pg_subscription];
> +
> + memset(values, 0, sizeof(values));
> + memset(nulls, false, sizeof(nulls));
> + memset(replaces, false, sizeof(replaces));
> +
> + if (!IsTransactionState())
> + StartTransactionCommand();
> +
> + LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0,
> + AccessShareLock);
>
> It is important to add a comment as to why we need a lock here.
Added.
>
> 9.
> + * needs to be set subskipxid again. We can reduce the possibility by
> + * logging a replication origin WAL record to advance the origin LSN
> + * instead but it doesn't seem to be worth since it's a very minor case.
>
> You can also add here that there is no way to advance origin_timestamp
> so that would be inconsistent.
Added.
>
> 10.
> +clear_subscription_skip_xid(TransactionId xid, XLogRecPtr origin_lsn,
> + TimestampTz origin_timestamp)
> {
> ..
> ..
> + if (!IsTransactionState())
> + StartTransactionCommand();
> ..
> ..
> + CommitTransactionCommand();
> ..
> }
>
> The transaction should be committed in this function if it is started
> here otherwise it should be the responsibility of the caller to commit
> it.
Fixed.
I've attached an updated patch that incorporated these comments except
for 6 and 7 that we probably need more discussion on. The comments
from Vignesh are also incorporated.
Regards,
--
Masahiko Sawada
EDB: https://www.enterprisedb.com/
Attachment | Content-Type | Size |
---|---|---|
v5-0001-Add-ALTER-SUBSCRIPTION-.-SKIP-to-skip-the-transac.patch | application/octet-stream | 58.5 KB |
From | Date | Subject | |
---|---|---|---|
Next Message | Masahiko Sawada | 2022-01-17 04:20:19 | Re: Skipping logical replication transactions on subscriber side |
Previous Message | Tom Lane | 2022-01-17 04:01:25 | Re: Null commitTS bug |