Logical replication (pgoutput plugin) in streaming mode: peek() always starts from beginning of transaction, not from latest stream block

From: ledieudesmammouths(at)free(dot)fr
To: pgsql-general(at)lists(dot)postgresql(dot)org
Subject: Logical replication (pgoutput plugin) in streaming mode: peek() always starts from beginning of transaction, not from latest stream block
Date: 2023-11-08 19:01:31
Message-ID: 1572490564.304409181.1699470091391.JavaMail.root@zimbra59-e10.priv.proxad.net
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-general

Hi everyone,

When using logical replication with the pgoutput plugin, on PG 16,we do the following:
1) SELECT * FROM pg_logical_slot_peek_binary_changes('test_slot_v1', null, null,'publication_names', 'cdc', 'proto_version', '4', 'streaming', 'false')
2) Get LSN of last row (Commit)
3) SELECT * FROM pg_replication_slot_advance('test_slot_v1', <Commit LSN>);
4) Repeat.

And this works perfectly fine when streaming = false. When turning on streaming the expectation is that the same thing happens, except the the LSN being passed to pg_replication_slot_advance() is for a Stream End record. On the next call to pg_logical_slot_peek_binary_changes() we should get the subsequent Stream Start record. But instead, the stream starts over from the transaction Begin record. Observe:

*** Demo starts ***
*** Initially there are no changes, peek() returns nothing: ***

=> SELECT * FROM pg_logical_slot_peek_binary_changes('test_slot_v1', null, null,'publication_names', 'cdc', 'proto_version', '4', 'streaming', 'true') WHERE SUBSTRING(data, 1,1) NOT IN ('\x49', '\x44');
lsn | xid | data
-----+-----+------
(0 rows)

*** Slot status: ***

=> SELECT slot_name, active, restart_lsn, confirmed_flush_lsn FROM pg_replication_slots; slot_name | active | restart_lsn | confirmed_flush_lsn
--------------+--------+-------------+---------------------
test_slot_v1 | f | 2/98CE060 | 2/98CE060
(1 rows)

*** Now make some changes (delete then insert a bunch of records) and call peek() ***
*** The predicate filters out Delete and Insert records, leaving Stream Start (\x53 = S), ***
*** Relation (\x52 = R), Stream End (\x45 = E), and Stream Commit (\x63 = c) ***

abinitio=> SELECT * FROM pg_logical_slot_peek_binary_changes('test_slot_v1', null, null,'publication_names', 'cdc', 'proto_version', '4', 'streaming', 'true') WHERE SUBSTRING(data, 1,1) NOT IN ('\x49', '\x44');
lsn | xid | data
------------+------+--------------------------------------------------------------------------------------------------------------------------------------
2/A222A20 | 1112 | \x530000045801
2/A222A20 | 1112 | \x52000004590000402a7075626c6963007265706c69636174696f6e5f746573745f7631006400020169640000000017ffffffff006e616d650000000019ffffffff
2/C141BE8 | 1112 | \x45
2/C141C28 | 1112 | \x530000045800
2/DF598D8 | 1112 | \x45
2/DF59950 | 1112 | \x630000045800000000020df59918000000020df599500002aca72900f8a8
2/DF59950 | 1114 | \x530000045a01
2/DF59950 | 1114 | \x520000045a0000402a7075626c6963007265706c69636174696f6e5f746573745f7631006400020169640000000017ffffffff006e616d650000000019ffffffff
2/108918D0 | 1114 | \x45
2/108918D0 | 1114 | \x530000045a00
2/131E1310 | 1114 | \x45
2/131E1310 | 1114 | \x530000045a00
2/137D7768 | 1114 | \x45
2/137E8448 | 1114 | \x630000045a0000000002137e841800000002137e84480002aca729812c96
(14 rows)

*** It was a peek() so the status is unchanged: ***

=> SELECT slot_name, active, restart_lsn, confirmed_flush_lsn FROM pg_replication_slots; slot_name | active | restart_lsn | confirmed_flush_lsn
--------------+--------+-------------+---------------------
test_slot_v1 | f | 2/98CE060 | 2/98CE060
(1 rows)

*** Now advance the slot to the first Stream End record: ***

=> SELECT * FROM pg_replication_slot_advance('test_slot_v1', '2/C141BE8'); slot_name | end_lsn
--------------+-----------
test_slot_v1 | 2/C141BE8
(1 row)

*** confirmed_flush_lsn is updated as expected: ****

=> SELECT slot_name, active, restart_lsn, confirmed_flush_lsn FROM pg_replication_slots;
slot_name | active | restart_lsn | confirmed_flush_lsn
--------------+--------+-------------+---------------------
test_slot_v1 | f | 2/9B09D10 | 2/C141BE8
(1 rows)

*** Now peek() again. It is starting from earlier than confirmed_flush_lsn: ***

=> SELECT * FROM pg_logical_slot_peek_binary_changes('test_slot_v1', null, null,'publication_names', 'cdc', 'proto_version', '4', 'streaming', 'true') WHERE SUBSTRING(data, 1,1) NOT IN ('\x49', '\x44');
lsn | xid | data
------------+------+--------------------------------------------------------------------------------------------------------------------------------------
2/A222A20 | 1112 | \x530000045801
2/A222A20 | 1112 | \x52000004590000402a7075626c6963007265706c69636174696f6e5f746573745f7631006400020169640000000017ffffffff006e616d650000000019ffffffff
2/C141BE8 | 1112 | \x45
2/C141C28 | 1112 | \x530000045800
2/DF598D8 | 1112 | \x45
2/DF59950 | 1112 | \x630000045800000000020df59918000000020df599500002aca72900f8a8
2/DF59950 | 1114 | \x530000045a01
2/DF59950 | 1114 | \x520000045a0000402a7075626c6963007265706c69636174696f6e5f746573745f7631006400020169640000000017ffffffff006e616d650000000019ffffffff
2/108918D0 | 1114 | \x45
2/108918D0 | 1114 | \x530000045a00
2/131E1310 | 1114 | \x45
2/131E1310 | 1114 | \x530000045a00
2/137D7768 | 1114 | \x45
2/137E8448 | 1114 | \x630000045a0000000002137e841800000002137e84480002aca729812c96
(14 rows)

*** Next advance to the Stream Commit record: ***

=> SELECT * FROM pg_replication_slot_advance('test_slot_v1', '2/DF59950'); slot_name | end_lsn
--------------+-----------
test_slot_v1 | 2/DF59950
(1 row)

*** This time the peek() starts from the correct LSN: ***

=> SELECT * FROM pg_logical_slot_peek_binary_changes('test_slot_v1', null, null,'publication_names', 'cdc', 'proto_version', '4', 'streaming', 'true') WHERE SUBSTRING(data, 1,1) NOT IN ('\x49', '\x44');
lsn | xid | data
------------+------+--------------------------------------------------------------------------------------------------------------------------------------
2/DF59950 | 1114 | \x530000045a01
2/DF59950 | 1114 | \x520000045a0000402a7075626c6963007265706c69636174696f6e5f746573745f7631006400020169640000000017ffffffff006e616d650000000019ffffffff
2/108918D0 | 1114 | \x45
2/108918D0 | 1114 | \x530000045a00
2/131E1310 | 1114 | \x45
2/131E1310 | 1114 | \x530000045a00
2/137D7768 | 1114 | \x45
2/137E8448 | 1114 | \x630000045a0000000002137e841800000002137e84480002aca729812c96
(8 rows)

*** End of demo ***

The question is whether that is by design or a bug, and if by design maybe someone can explain how this is meant to be used, because it's not clear. It will work eventually if argument upto_nchanges is NULL, because when the transaction completes we get a Stream Commit record and can advance, but in the meantime we'll have ingested a lot of duplicate records we now have to deal with. And if argument upto_nchanges is not NULL we're stuck because peek() will only returns one or more Stream blocks until the number of returned rows exceeds upto_nchanges , and then returns the same blocks over and over again forever because we cannot advance, and never see the Stream Commit record.

Thank you.

Guillaume.

Browse pgsql-general by date

  From Date Subject
Next Message shashidhar Reddy 2023-11-08 19:06:21 Re: Finding execution plan
Previous Message Ron 2023-11-08 17:08:29 Re: No longer available installer for greater version then PostgreSQL 10