Re: CDC/ETL system on top of logical replication with pgoutput, custom client

From: "Euler Taveira" <euler(at)eulerto(dot)com>
To: José Neves <rafaneves3(at)msn(dot)com>, pgsql-hackers <pgsql-hackers(at)postgresql(dot)org>
Subject: Re: CDC/ETL system on top of logical replication with pgoutput, custom client
Date: 2023-07-31 14:27:46
Message-ID: 4c1a4dcb-3bb8-49d9-b91b-098a64a02f61@app.fastmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

On Sat, Jul 29, 2023, at 8:07 PM, José Neves wrote:
> I'm attempting to develop a CDC on top of Postgres, currently using 12, the last minor, with a custom client, and I'm running into issues with data loss caused by out-of-order logical replication messages.

Can you provide a test case to show this issue? Did you try in a newer version?

> The problem is as follows: postgres streams *A, B, D, G, K, I, P *logical replication events, upon exit signal we stop consuming new events at LSN *K,* and we wait 30s for out-of-order events. Let's say that we only got *A*, (and *K* ofc) so in the following 30s, we get *B, D*, however, for whatever reason, *G* never arrived. As with pgoutput-based logical replication we have no way to calculate the next LSN, we have no idea that *G* was missing, so we assumed that it all arrived, committing *K *to postgres slot and shutdown. In the next run, our worker will start receiving data from *K* forward, and *G* is lost forever...
> Meanwhile postgres moves forward with archiving and we can't go back to check if we lost anything. And even if we could, would be extremely inefficient.

Logical decoding provides the changes to output plugin at commit time. You
mentioned the logical replication events but didn't say which are part of the
same transaction. Let's say A, B, D and K are changes from the same transaction
and G, I and P are changes from another transaction. The first transaction will
be available when it processes K. The second transaction will be provided when
the logical decoding processes P.

You didn't say how your consumer is working. Are you sure your consumer doesn't
get the second transaction? If your consumer is advancing the replication slot
*after* receiving K (using pg_replication_slot_advance), it is doing it wrong.
Another common problem with consumer is that it uses
pg_logical_slot_get_changes() but *before* using the data it crashes; in this
case, the data is lost.

It is hard to say where the problem is if you didn't provide enough information
about the consumer logic and the WAL information (pg_waldump output) around the
time you detect the data loss.

> In sum, the issue comes from the fact that postgres will stream events with unordered LSNs on high transactional systems, and that pgoutput doesn't have access to enough information to calculate the next or last LSN, so we have no way to check if we receive all the data that we are supposed to receive, risking committing an offset that we shouldn't as we didn't receive yet preceding data.
>
> It seems very either to me that none of the open-source CDC projects that I looked into care about this. They always assume that the next LSN received is... well the next one, and commit that one, so upon restart, they are vulnerable to the same issue. So... either I'm missing something... or we have a generalized assumption causing data loss under certain conditions all over.

Let me illustrate the current behavior. Let's say there are 3 concurrent
transactions.

Session A
==========

euler=# SELECT pg_create_logical_replication_slot('repslot1', 'wal2json');
pg_create_logical_replication_slot
------------------------------------
(repslot1,0/369DF088)
(1 row)

euler=# create table foo (a int primary key);
CREATE TABLE
euler=# BEGIN;
BEGIN
euler=*# INSERT INTO foo (a) SELECT generate_series(1, 2);
INSERT 0 2

Session B
==========

euler=# BEGIN;
BEGIN
euler=*# INSERT INTO foo (a) SELECT generate_series(11, 12);
INSERT 0 2

Session C
==========

euler=# BEGIN;
BEGIN
euler=*# INSERT INTO foo (a) SELECT generate_series(21, 22);
INSERT 0 2

Session A
==========

euler=*# INSERT INTO foo (a) VALUES(3);
INSERT 0 1

Session B
==========

euler=*# INSERT INTO foo (a) VALUES(13);
INSERT 0 1

Session C
==========

euler=*# INSERT INTO foo (a) VALUES(23);
INSERT 0 1
euler=*# COMMIT;
COMMIT

Session B
==========

euler=*# COMMIT;
COMMIT

Session A
==========

euler=*# COMMIT;
COMMIT

The output is:

euler=# SELECT * FROM pg_logical_slot_peek_changes('repslot1', NULL, NULL, 'format-version', '2', 'include-types', '0');
lsn | xid | data
------------+--------+------------------------------------------------------------------------------------
0/369E4800 | 454539 | {"action":"B"}
0/36A05088 | 454539 | {"action":"C"}
0/36A05398 | 454542 | {"action":"B"}
0/36A05398 | 454542 | {"action":"I","schema":"public","table":"foo","columns":[{"name":"a","value":21}]}
0/36A05418 | 454542 | {"action":"I","schema":"public","table":"foo","columns":[{"name":"a","value":22}]}
0/36A05658 | 454542 | {"action":"I","schema":"public","table":"foo","columns":[{"name":"a","value":23}]}
0/36A057C0 | 454542 | {"action":"C"}
0/36A05258 | 454541 | {"action":"B"}
0/36A05258 | 454541 | {"action":"I","schema":"public","table":"foo","columns":[{"name":"a","value":11}]}
0/36A052D8 | 454541 | {"action":"I","schema":"public","table":"foo","columns":[{"name":"a","value":12}]}
0/36A05598 | 454541 | {"action":"I","schema":"public","table":"foo","columns":[{"name":"a","value":13}]}
0/36A057F0 | 454541 | {"action":"C"}
0/36A050C0 | 454540 | {"action":"B"}
0/36A050C0 | 454540 | {"action":"I","schema":"public","table":"foo","columns":[{"name":"a","value":1}]}
0/36A051A0 | 454540 | {"action":"I","schema":"public","table":"foo","columns":[{"name":"a","value":2}]}
0/36A054D8 | 454540 | {"action":"I","schema":"public","table":"foo","columns":[{"name":"a","value":3}]}
0/36A05820 | 454540 | {"action":"C"}
(17 rows)

Since session C committed first, it is the first transaction available to output
plugin (wal2json). Transaction 454541 is the next one that is available because
it committed after session C (transaction 454542) and the first transaction
that started (session A) is the last one available. You can also notice that
the first transaction (454540) is the last one available.

Your consumer cannot rely on LSN position or xid to track the progress.
Instead, Postgres provides a replication progress mechanism [1] to do it.

[1] https://www.postgresql.org/docs/current/replication-origins.html

--
Euler Taveira
EDB https://www.enterprisedb.com/

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Masahiko Sawada 2023-07-31 15:15:25 Inaccurate comments in ReorderBufferCheckMemoryLimit()
Previous Message José Neves 2023-07-31 14:16:22 RE: CDC/ETL system on top of logical replication with pgoutput, custom client