Re: Subscription stuck at initialize state

From: Vijaykumar Jain <vijaykumarjain(dot)github(at)gmail(dot)com>
To: Abhishek Bhola <abhishek(dot)bhola(at)japannext(dot)co(dot)jp>
Cc: Steve Baldwin <steve(dot)baldwin(at)gmail(dot)com>, pgsql-general <pgsql-general(at)postgresql(dot)org>
Subject: Re: Subscription stuck at initialize state
Date: 2022-02-03 08:25:47
Message-ID: CAM+6J949E4YGZy9TePPTzspf8u49rwuTNVQm0396u3sOGZDSqg@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-general

On Thu, 3 Feb 2022 at 12:44, Abhishek Bhola <abhishek(dot)bhola(at)japannext(dot)co(dot)jp>
wrote:

> Hi Vijaykumar,
>
> I checked the pg_subscription_rel and all the tables in that subscription
> are in the state - i (initialize).
> I also tried creating a new publication on the source DB with just one
> table and tried to subscribe it, it doesn't work either.
> However, when I try to subscribe it on some other DB than the one
> mentioned above, it works.
> By which I am deducing that publication and the source DB are okay, the
> problem is on the target DB and it's subscription.
> Maybe I will have to restart the DB as a last resort, but I am not sure if
> that will solve the problem either.
>
>
its a very verbose mail, so if it noisy, kindly ignore.

else,

can you check basic connectivity from the subscriber to publisher using
psql and run a simple query ?
can you share your "create publication" and "create subscription"
commands/statements too please?

i am attaching a general logical replication setup on a single server and
put some scenarios where replication breaks and how to monitor and how to
resume.
and how that is monitored.

postgres(at)controller:~$ tail db1/postgresql.conf db2/postgresql.conf
==> db1/postgresql.conf <==

# Add settings for extensions here
wal_level=logical
archive_mode = on
archive_command = '/bin/true'
max_wal_size = 48MB
min_wal_size = 32MB
shared_buffers = 32MB
port = 5001
max_logical_replication_workers = 10

==> db2/postgresql.conf <==
# Add settings for extensions here
wal_level=logical
archive_mode = on
archive_command = '/bin/true'
max_wal_size = 48MB
min_wal_size = 32MB
shared_buffers = 32MB
port = 5002
max_logical_replication_workers = 10

postgres(at)controller:~$ pg_ctl -D db1 -l db1.log start
waiting for server to start.... done
server started
postgres(at)controller:~$ pg_ctl -D db2 -l db2.log start
waiting for server to start.... done
server started
postgres(at)controller:~$ psql -p 5001
psql (14.1 (Ubuntu 14.1-2.pgdg20.04+1))
Type "help" for help.

postgres=# \x
Expanded display is on.
postgres=# create table t1(id int primary key);
CREATE TABLE
postgres=# create table t2(id int); -- this will throw error on delete, no
replica identity
CREATE TABLE
postgres=# insert into t1 select x from generate_series(1, 100) x;
INSERT 0 100
postgres=# insert into t2 select x from generate_series(1, 100) x;
INSERT 0 100
postgres=# checkpoint;
CHECKPOINT
postgres=# \q
postgres(at)controller:~$ psql -p 5002
psql (14.1 (Ubuntu 14.1-2.pgdg20.04+1))
Type "help" for help.

postgres=# create table t1(id int primary key);
CREATE TABLE
postgres=# create table t2(id int);
CREATE TABLE
postgres=# \q
postgres(at)controller:~$ ps aux | grep -i postgres:
postgres 1116 0.0 0.4 113632 8232 ? Ss 13:24 0:00
postgres: checkpointer
postgres 1117 0.0 0.2 113496 5868 ? Ss 13:24 0:00
postgres: background writer
postgres 1118 0.0 0.3 113496 6964 ? Ss 13:24 0:00
postgres: walwriter
postgres 1119 0.0 0.4 114032 8432 ? Ss 13:24 0:00
postgres: autovacuum launcher
postgres 1120 0.0 0.2 113496 4132 ? Ss 13:24 0:00
postgres: archiver
postgres 1121 0.0 0.2 72112 4896 ? Ss 13:24 0:00
postgres: stats collector
postgres 1122 0.0 0.3 113928 6732 ? Ss 13:24 0:00
postgres: logical replication launcher
postgres 1128 0.0 0.3 113496 5956 ? Ss 13:24 0:00
postgres: checkpointer
postgres 1129 0.0 0.3 113496 5916 ? Ss 13:24 0:00
postgres: background writer
postgres 1130 0.0 0.3 113496 6952 ? Ss 13:24 0:00
postgres: walwriter
postgres 1131 0.0 0.4 114032 8384 ? Ss 13:24 0:00
postgres: autovacuum launcher
postgres 1132 0.0 0.2 113496 4160 ? Ss 13:24 0:00
postgres: archiver
postgres 1133 0.0 0.2 72112 4868 ? Ss 13:24 0:00
postgres: stats collector
postgres 1134 0.0 0.3 113928 6804 ? Ss 13:24 0:00
postgres: logical replication launcher
postgres 1186 0.0 0.0 8164 724 pts/0 S+ 13:26 0:00 grep -i
postgres:
postgres(at)controller:~$ psql -p 5001
psql (14.1 (Ubuntu 14.1-2.pgdg20.04+1))
Type "help" for help.

postgres=# create publication mypub for all tables

postgres=# create publication mypub for all tables with
(publish='insert,update,delete,truncate');
CREATE PUBLICATION
postgres=# \q
postgres(at)controller:~$ psql -p 5002
psql (14.1 (Ubuntu 14.1-2.pgdg20.04+1))
Type "help" for help.

postgres=# create subscription mysub connection 'port=5001' publication
mypub;
NOTICE: created replication slot "mysub" on publisher
CREATE SUBSCRIPTION
postgres=# select count(1) from t1;
count
-------
100
(1 row)

postgres=# select count(1) from t2;
count
-------
100
(1 row)

postgres=# \q

postgres(at)controller:~$ ps aux | grep postgres: # strace the below pids for
movement
postgres 1195 0.0 0.7 114800 14744 ? Ss 13:27 0:00
postgres: logical replication worker for subscription 16392
postgres 1196 0.0 0.7 114728 14676 ? Ss 13:27 0:00
postgres: walsender postgres [local] START_REPLICATION

# logical replication slot is active(=t) with an active pid (which we
strace) and has movement of lsn (= healthy)
postgres(at)controller:~$ psql -p 5001
psql (14.1 (Ubuntu 14.1-2.pgdg20.04+1))
Type "help" for help.

postgres=# \x
Expanded display is on.
postgres=# select * from pg_replication_slots;
-[ RECORD 1 ]-------+----------
slot_name | mysub
plugin | pgoutput
slot_type | logical
datoid | 13726
database | postgres
temporary | f
active | t
active_pid | 1196
xmin |
catalog_xmin | 740
restart_lsn | 0/20308F0
confirmed_flush_lsn | 0/2030928
wal_status | reserved
safe_wal_size |
two_phase | f

postgres=# \q

#strace publication worker process , check if it is moving fine
root(at)controller:~# strace -p 1196
strace: Process 1196 attached
epoll_wait(4, [{EPOLLIN, {u32=654228920, u64=94283776310712}}], 1, 29577) =
1
recvfrom(9, "d", 1, 0, NULL, NULL) = 1
recvfrom(9,
"\0\0\0&r\0\0\0\0\2\3\t(\0\0\0\0\2\3\t(\0\0\0\0\2\3\t(\0\2z"..., 8192, 0,
NULL, NULL) = 38
recvfrom(9, 0x7fff98dfbf07, 1, 0, NULL, NULL) = -1 EAGAIN (Resource
temporarily unavailable)
sendto(9, "d\0\0\0\26k\0\0\0\0\2\3\t(\0\2z\27H\376\271\323\0", 23, 0, NULL,
0) = 23
epoll_wait(4, [{EPOLLIN, {u32=654228920, u64=94283776310712}}], 1, 29999) =
1
recvfrom(9, "d", 1, 0, NULL, NULL) = 1
recvfrom(9,
"\0\0\0&r\0\0\0\0\2\3\t(\0\0\0\0\2\3\t(\0\0\0\0\2\3\t(\0\2z"..., 8192, 0,
NULL, NULL) = 38
recvfrom(9, 0x7fff98dfbf07, 1, 0, NULL, NULL) = -1 EAGAIN (Resource
temporarily unavailable)
epoll_wait(4, [{EPOLLIN, {u32=654228920, u64=94283776310712}}], 1, 30000) =
1
recvfrom(9, "d", 1, 0, NULL, NULL) = 1
recvfrom(9,
"\0\0\0&r\0\0\0\0\2\3\t(\0\0\0\0\2\3\t(\0\0\0\0\2\3\t(\0\2z"..., 8192, 0,
NULL, NULL) = 38
recvfrom(9, 0x7fff98dfbf07, 1, 0, NULL, NULL) = -1 EAGAIN (Resource
temporarily unavailable)
epoll_wait(4, [{EPOLLIN, {u32=654228920, u64=94283776310712}}], 1, 30000) =
1
recvfrom(9, "d", 1, 0, NULL, NULL) = 1
recvfrom(9,
"\0\0\0&r\0\0\0\0\2\3\t(\0\0\0\0\2\3\t(\0\0\0\0\2\3\t(\0\2z"..., 8192, 0,
NULL, NULL) = 38
recvfrom(9, 0x7fff98dfbf07, 1, 0, NULL, NULL) = -1 EAGAIN (Resource
temporarily unavailable)
sendto(9, "d\0\0\0\26k\0\0\0\0\2\3\t(\0\2z\27J\311_\352\0", 23, 0, NULL, 0)
= 23
epoll_wait(4, [{EPOLLIN, {u32=654228920, u64=94283776310712}}], 1, 29999) =
1
recvfrom(9, "d", 1, 0, NULL, NULL) = 1
recvfrom(9,
"\0\0\0&r\0\0\0\0\2\3\t(\0\0\0\0\2\3\t(\0\0\0\0\2\3\t(\0\2z"..., 8192, 0,
NULL, NULL) = 38
recvfrom(9, 0x7fff98dfbf07, 1, 0, NULL, NULL) = -1 EAGAIN (Resource
temporarily unavailable)
epoll_wait(4, [{EPOLLIN, {u32=654228920, u64=94283776310712}}], 1, 30000) =
1
recvfrom(9, "d", 1, 0, NULL, NULL) = 1
recvfrom(9,
"\0\0\0&r\0\0\0\0\2\3\t(\0\0\0\0\2\3\t(\0\0\0\0\2\3\t(\0\2z"..., 8192, 0,
NULL, NULL) = 38
recvfrom(9, 0x7fff98dfbf07, 1, 0, NULL, NULL) = -1 EAGAIN (Resource
temporarily unavailable)
epoll_wait(4, [{EPOLLIN, {u32=654228920, u64=94283776310712}}], 1, 30000) =
1
recvfrom(9, "d", 1, 0, NULL, NULL) = 1
recvfrom(9,
"\0\0\0&r\0\0\0\0\2\3\t(\0\0\0\0\2\3\t(\0\0\0\0\2\3\t(\0\2z"..., 8192, 0,
NULL, NULL) = 38
recvfrom(9, 0x7fff98dfbf07, 1, 0, NULL, NULL) = -1 EAGAIN (Resource
temporarily unavailable)
sendto(9, "d\0\0\0\26k\0\0\0\0\2\3\t(\0\2z\27L\224?s\0", 23, 0, NULL, 0) =
23
epoll_wait(4, [{EPOLLIN, {u32=654228920, u64=94283776310712}}], 1, 30000) =
1
recvfrom(9, "d", 1, 0, NULL, NULL) = 1
recvfrom(9,
"\0\0\0&r\0\0\0\0\2\3\t(\0\0\0\0\2\3\t(\0\0\0\0\2\3\t(\0\2z"..., 8192, 0,
NULL, NULL) = 38
recvfrom(9, 0x7fff98dfbf07, 1, 0, NULL, NULL) = -1 EAGAIN (Resource
temporarily unavailable)
epoll_wait(4, [{EPOLLIN, {u32=654228944, u64=94283776310736}}], 1, 30000) =
1
read(10,
"\27\0\0\0\0\0\0\0\0\0\0\0^\4\0\0p\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0"..., 1024)
= 128
recvfrom(9, 0x7fff98dfbf07, 1, 0, NULL, NULL) = -1 EAGAIN (Resource
temporarily unavailable)
pread64(6,
"\r\321\5\0\1\0\0\0\0\0\3\2\0\0\0\0\0\3\0\0\0\0\0\0\2365\0\0\210\n\0\0"...,
8192, 196608) = 8192
stat("pg_logical/snapshots/0-2030928.snap", 0x7fff98dfb6b0) = -1 ENOENT (No
such file or directory)
unlink("pg_logical/snapshots/0-2030928.snap.1196.tmp") = -1 ENOENT (No such
file or directory)
openat(AT_FDCWD, "pg_logical/snapshots/0-2030928.snap.1196.tmp",
O_WRONLY|O_CREAT|O_EXCL, 0600) = 7
write(7,
"\1\340\241Q\346\223#\367\4\0\0\0\200\0\0\0\2\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0"...,
128) = 128
fsync(7) = 0
close(7) = 0
openat(AT_FDCWD, "pg_logical/snapshots", O_RDONLY) = 7
fsync(7) = 0
close(7) = 0
rename("pg_logical/snapshots/0-2030928.snap.1196.tmp",
"pg_logical/snapshots/0-2030928.snap") = 0
openat(AT_FDCWD, "pg_logical/snapshots/0-2030928.snap", O_RDWR) = 7
fsync(7) = 0
close(7) = 0
openat(AT_FDCWD, "pg_logical/snapshots", O_RDONLY) = 7
fsync(7) = 0
close(7) = 0
openat(AT_FDCWD, "pg_replslot/mysub/state.tmp", O_WRONLY|O_CREAT|O_EXCL,
0600) = 7
write(7,
"\241\34\5\1W\23\376Y\2\0\0\0\270\0\0\0mysub\0\0\0\0\0\0\0\0\0\0\0"...,
200) = 200
fsync(7) = 0
close(7) = 0
rename("pg_replslot/mysub/state.tmp", "pg_replslot/mysub/state") = 0
openat(AT_FDCWD, "pg_replslot/mysub/state", O_RDWR) = 7
fsync(7) = 0
close(7) = 0
openat(AT_FDCWD, "pg_replslot/mysub", O_RDONLY) = 7
fsync(7) = 0
close(7) = 0
openat(AT_FDCWD, "pg_replslot", O_RDONLY) = 7
fsync(7) = 0
close(7) = 0
recvfrom(9, 0x7fff98dfc077, 1, 0, NULL, NULL) = -1 EAGAIN (Resource
temporarily unavailable)
recvfrom(9, 0x7fff98dfbf07, 1, 0, NULL, NULL) = -1 EAGAIN (Resource
temporarily unavailable)
pread64(6,
"\r\321\5\0\1\0\0\0\0\0\3\2\0\0\0\0\0\3\0\0\0\0\0\0\2365\0\0\210\n\0\0"...,
8192, 196608) = 8192
sendto(9, "d\0\0\0\26k\0\0\0\0\2\3\t`\0\2z\27M\223\336\351\0", 23, 0, NULL,
0) = 23
recvfrom(9, "d", 1, 0, NULL, NULL) = 1
recvfrom(9,
"\0\0\0&r\0\0\0\0\2\3\t`\0\0\0\0\2\3\t`\0\0\0\0\2\3\t`\0\2z"..., 8192, 0,
NULL, NULL) = 38
recvfrom(9, 0x7fff98dfc077, 1, 0, NULL, NULL) = -1 EAGAIN (Resource
temporarily unavailable)
recvfrom(9, 0x7fff98dfbf07, 1, 0, NULL, NULL) = -1 EAGAIN (Resource
temporarily unavailable)
sendto(9, "d\0\0\0\26k\0\0\0\0\2\3\t\330\0\2z\27M\223\341P\0", 23, 0, NULL,
0) = 23
epoll_wait(4, [{EPOLLIN, {u32=654228920, u64=94283776310712}}], 1, 30000) =
1
recvfrom(9, "d", 1, 0, NULL, NULL) = 1
recvfrom(9,
"\0\0\0&r\0\0\0\0\2\3\t\330\0\0\0\0\2\3\t\330\0\0\0\0\2\3\t\330\0\2z"...,
8192, 0, NULL, NULL) = 38
recvfrom(9, 0x7fff98dfbf07, 1, 0, NULL, NULL) = -1 EAGAIN (Resource
temporarily unavailable)
epoll_wait(4, [{EPOLLIN, {u32=654228944, u64=94283776310736}}], 1, 30000) =
1
read(10,
"\27\0\0\0\0\0\0\0\0\0\0\0^\4\0\0p\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0"..., 1024)
= 128
recvfrom(9, 0x7fff98dfbf07, 1, 0, NULL, NULL) = -1 EAGAIN (Resource
temporarily unavailable)
pread64(6,
"\r\321\5\0\1\0\0\0\0\0\3\2\0\0\0\0\0\3\0\0\0\0\0\0\2365\0\0\210\n\0\0"...,
8192, 196608) = 8192
stat("pg_logical/snapshots/0-20309D8.snap", 0x7fff98dfb6b0) = -1 ENOENT (No
such file or directory)
unlink("pg_logical/snapshots/0-20309D8.snap.1196.tmp") = -1 ENOENT (No such
file or directory)
openat(AT_FDCWD, "pg_logical/snapshots/0-20309D8.snap.1196.tmp",
O_WRONLY|O_CREAT|O_EXCL, 0600) = 7
write(7,
"\1\340\241Q\336\33\320]\4\0\0\0\200\0\0\0\2\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0"...,
128) = 128
fsync(7) = 0
close(7) = 0
openat(AT_FDCWD, "pg_logical/snapshots", O_RDONLY) = 7
fsync(7) = 0
close(7) = 0
rename("pg_logical/snapshots/0-20309D8.snap.1196.tmp",
"pg_logical/snapshots/0-20309D8.snap") = 0
openat(AT_FDCWD, "pg_logical/snapshots/0-20309D8.snap", O_RDWR) = 7
fsync(7) = 0
close(7) = 0
openat(AT_FDCWD, "pg_logical/snapshots", O_RDONLY) = 7
fsync(7) = 0
close(7) = 0
openat(AT_FDCWD, "pg_replslot/mysub/state.tmp", O_WRONLY|O_CREAT|O_EXCL,
0600) = 7
write(7,
"\241\34\5\1\353A\375\303\2\0\0\0\270\0\0\0mysub\0\0\0\0\0\0\0\0\0\0\0"...,
200) = 200
fsync(7) = 0
close(7) = 0
rename("pg_replslot/mysub/state.tmp", "pg_replslot/mysub/state") = 0
openat(AT_FDCWD, "pg_replslot/mysub/state", O_RDWR) = 7
fsync(7) = 0
close(7) = 0
openat(AT_FDCWD, "pg_replslot/mysub", O_RDONLY) = 7
fsync(7) = 0
close(7) = 0
openat(AT_FDCWD, "pg_replslot", O_RDONLY) = 7
fsync(7) = 0
close(7) = 0
recvfrom(9, 0x7fff98dfc077, 1, 0, NULL, NULL) = -1 EAGAIN (Resource
temporarily unavailable)
recvfrom(9, 0x7fff98dfbf07, 1, 0, NULL, NULL) = -1 EAGAIN (Resource
temporarily unavailable)
sendto(9, "d\0\0\0\26k\0\0\0\0\2\3\n\20\0\2z\27M\346W\275\0", 23, 0, NULL,
0) = 23
epoll_wait(4, [{EPOLLIN, {u32=654228920, u64=94283776310712}}], 1, 24596) =
1
recvfrom(9, "d", 1, 0, NULL, NULL) = 1
recvfrom(9,
"\0\0\0&r\0\0\0\0\2\3\n\20\0\0\0\0\2\3\n\20\0\0\0\0\2\3\n\20\0\2z"...,
8192, 0, NULL, NULL) = 38
recvfrom(9, 0x7fff98dfbf07, 1, 0, NULL, NULL) = -1 EAGAIN (Resource
temporarily unavailable)
epoll_wait(4, [{EPOLLIN, {u32=654228944, u64=94283776310736}}], 1, 30000) =
1
read(10,
"\27\0\0\0\0\0\0\0\0\0\0\0\237\5\0\0p\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0"...,
1024) = 128
recvfrom(9, 0x7fff98dfbf07, 1, 0, NULL, NULL) = -1 EAGAIN (Resource
temporarily unavailable)
pread64(6,
"\r\321\5\0\1\0\0\0\0\0\3\2\0\0\0\0\0\3\0\0\0\0\0\0\2365\0\0\210\n\0\0"...,
8192, 196608) = 8192
pread64(6, "\r\321\5\0\1\0\0\0\0
\3\2\0\0\0\0\21\10\0\0\0\0\0\0005\0\0forei"..., 8192, 204800) = 8192
recvfrom(9, 0x7fff98dfc077, 1, 0, NULL, NULL) = -1 EAGAIN (Resource
temporarily unavailable)
pread64(6, "\r\321\5\0\1\0\0\0\0(at)\3\2\0\0\0\0U\7\0\0\0\0\0\0\0\0\0\0\0001\0\33"...,
8192, 212992) = 8192
recvfrom(9, 0x7fff98dfc077, 1, 0, NULL, NULL) = -1 EAGAIN (Resource
temporarily unavailable)
pread64(6,
"\r\321\5\0\1\0\0\0\0`\3\2\0\0\0\0a\16\0\0\0\0\0\0\v\30\0\337\4\0\0\203"...,
8192, 221184) = 8192
recvfrom(9, 0x7fff98dfc077, 1, 0, NULL, NULL) = -1 EAGAIN (Resource
temporarily unavailable)
pread64(6,
"\r\321\5\0\1\0\0\0\0\200\3\2\0\0\0\0\225\5\0\0\0\0\0\0\0\0\0\0\0>\0F"...,
8192, 229376) = 8192
sendto(9,
"d\0\0\0002w\0\0\0\0\2\3\n\20\0\0\0\0\2\3\n\20\0\2z\27N]\200\371B\0"...,
51, 0, NULL, 0) = 51
brk(0x55c0270d1000) = 0x55c0270d1000
brk(0x55c0270ca000) = 0x55c0270ca000
openat(AT_FDCWD, "base/13726/3455", O_RDWR) = 7
pread64(7, "\0\0\0\0h;g\1\0\0\0\0\330\2\360\24\360\37\4 \0\0\0\0\340\237
\0\320\237 \0"..., 8192, 8192) = 8192
sendto(9,
"d\0\0\0007w\0\0\0\0\2\3\233\370\0\0\0\0\2\3\233\370\0\2z\27N]\205\334C\0"...,
56, 0, NULL, 0) = 56
rt_sigprocmask(SIG_SETMASK, [URG], NULL, 8) = 0
sendto(8,
"\27\0\0\0\220\0\0\0mysub\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0"..., 144,
0, NULL, 0) = 144
recvfrom(9, "d", 1, 0, NULL, NULL) = 1
recvfrom(9,
"\0\0\0&r\0\0\0\0\2\3\233\370\0\0\0\0\2\3\233\370\0\0\0\0\2\3\233\370\0\2z"...,
8192, 0, NULL, NULL) = 38
recvfrom(9, 0x7fff98dfc077, 1, 0, NULL, NULL) = -1 EAGAIN (Resource
temporarily unavailable)
recvfrom(9, 0x7fff98dfbf07, 1, 0, NULL, NULL) = -1 EAGAIN (Resource
temporarily unavailable)
epoll_wait(4, [{EPOLLIN, {u32=654228920, u64=94283776310712}}], 1, 30000) =
1
recvfrom(9, "d", 1, 0, NULL, NULL) = 1
recvfrom(9,
"\0\0\0&r\0\0\0\0\2\3\233\370\0\0\0\0\2\3\233\370\0\0\0\0\2\3\233\370\0\2z"...,
8192, 0, NULL, NULL) = 38
recvfrom(9, 0x7fff98dfbf07, 1, 0, NULL, NULL) = -1 EAGAIN (Resource
temporarily unavailable)
epoll_wait(4, [{EPOLLIN, {u32=654228944, u64=94283776310736}}], 1, 30000) =
1
read(10,
"\27\0\0\0\0\0\0\0\0\0\0\0^\4\0\0p\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0"..., 1024)
= 128
recvfrom(9, 0x7fff98dfbf07, 1, 0, NULL, NULL) = -1 EAGAIN (Resource
temporarily unavailable)
pread64(6,
"\r\321\5\0\1\0\0\0\0\200\3\2\0\0\0\0\225\5\0\0\0\0\0\0\0\0\0\0\0>\0F"...,
8192, 229376) = 8192
stat("pg_logical/snapshots/0-2039BF8.snap", 0x7fff98dfb6b0) = -1 ENOENT (No
such file or directory)
unlink("pg_logical/snapshots/0-2039BF8.snap.1196.tmp") = -1 ENOENT (No such
file or directory)
openat(AT_FDCWD, "pg_logical/snapshots/0-2039BF8.snap.1196.tmp",
O_WRONLY|O_CREAT|O_EXCL, 0600) = 12
write(12,
"\1\340\241Q\326\370d\25\4\0\0\0\204\0\0\0\2\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0"...,
132) = 132
fsync(12) = 0
close(12) = 0
openat(AT_FDCWD, "pg_logical/snapshots", O_RDONLY) = 12
fsync(12) = 0
close(12) = 0
rename("pg_logical/snapshots/0-2039BF8.snap.1196.tmp",
"pg_logical/snapshots/0-2039BF8.snap") = 0
openat(AT_FDCWD, "pg_logical/snapshots/0-2039BF8.snap", O_RDWR) = 12
fsync(12) = 0
close(12) = 0
openat(AT_FDCWD, "pg_logical/snapshots", O_RDONLY) = 12
fsync(12) = 0
close(12) = 0
openat(AT_FDCWD, "pg_replslot/mysub/state.tmp", O_WRONLY|O_CREAT|O_EXCL,
0600) = 12
write(12,
"\241\34\5\1>-\275R\2\0\0\0\270\0\0\0mysub\0\0\0\0\0\0\0\0\0\0\0"..., 200)
= 200
fsync(12) = 0
close(12) = 0
rename("pg_replslot/mysub/state.tmp", "pg_replslot/mysub/state") = 0
openat(AT_FDCWD, "pg_replslot/mysub/state", O_RDWR) = 12
fsync(12) = 0
close(12) = 0
openat(AT_FDCWD, "pg_replslot/mysub", O_RDONLY) = 12
fsync(12) = 0
close(12) = 0
openat(AT_FDCWD, "pg_replslot", O_RDONLY) = 12
fsync(12) = 0
close(12) = 0
openat(AT_FDCWD, "pg_replslot/mysub/state.tmp", O_WRONLY|O_CREAT|O_EXCL,
0600) = 12
write(12,
"\241\34\5\1\237\331q|\2\0\0\0\270\0\0\0mysub\0\0\0\0\0\0\0\0\0\0\0"...,
200) = 200
fsync(12) = 0
close(12) = 0
rename("pg_replslot/mysub/state.tmp", "pg_replslot/mysub/state") = 0
openat(AT_FDCWD, "pg_replslot/mysub/state", O_RDWR) = 12
fsync(12) = 0
close(12) = 0
openat(AT_FDCWD, "pg_replslot/mysub", O_RDONLY) = 12
fsync(12) = 0
close(12) = 0
openat(AT_FDCWD, "pg_replslot", O_RDONLY) = 12
fsync(12) = 0
close(12) = 0

*#check the subscription worker strace similarly *
strace -p 1195

*# check the state og subscription*
postgres(at)controller:~$ psql -p 5002
psql (14.1 (Ubuntu 14.1-2.pgdg20.04+1))
Type "help" for help.

postgres=# \x
Expanded display is on.
postgres=# select * from pg_subscription_rel;
-[ RECORD 1 ]---------
srsubid | 16392
srrelid | 16389
*srsubstate | r*
srsublsn | 0/20308F0
-[ RECORD 2 ]---------
srsubid | 16392
srrelid | 16384
*srsubstate | r*
srsublsn | 0/2030928

postgres=# select * from pg_stat_subscription;
-[ RECORD 1 ]---------+---------------------------------
subid | 16392
subname | mysub
*pid | 1195*
relid |
received_lsn | 0/2030928
*last_msg_send_time | 2022-02-03 13:30:10.969066+05:30*
*last_msg_receipt_time | 2022-02-03 13:30:10.971322+05:30*
latest_end_lsn | 0/2030928
latest_end_time | 2022-02-03 13:30:10.969066+05:30

postgres=# \q
postgres(at)controller:~$ psql -p 5001
psql (14.1 (Ubuntu 14.1-2.pgdg20.04+1))
Type "help" for help.

postgres=# \x
Expanded display is on.
postgres=# select * from pg_stat_replication;
-[ RECORD 1 ]----+---------------------------------
pid | 1196
usesysid | 10
usename | postgres
application_name | mysub
client_addr |
client_hostname |
client_port | -1
backend_start | 2022-02-03 13:27:40.690802+05:30
backend_xmin |
state | streaming
sent_lsn | 0/2030928
write_lsn | 0/2030928
flush_lsn | 0/2030928
replay_lsn | 0/2030928
write_lag |
flush_lag |
replay_lag |
sync_priority | 0
sync_state | async
reply_time | 2022-02-03 13:30:41.041007+05:30

*#i make ddl change which will break the subscription, as table not in sync*
postgres=# alter table t2 add column col2 int default 0;
ALTER TABLE
postgres=# select * from pg_stat_replication;
-[ RECORD 1 ]----+---------------------------------
pid | 1196
usesysid | 10
usename | postgres
application_name | mysub
client_addr |
client_hostname |
client_port | -1
backend_start | 2022-02-03 13:27:40.690802+05:30
backend_xmin |
state | streaming
sent_lsn | 0/2039BF8
write_lsn | 0/2039BF8
flush_lsn | 0/2039BF8
replay_lsn | 0/2039BF8
write_lag | 00:00:00.001365
flush_lag | 00:00:00.001365
replay_lag | 00:00:00.001365
sync_priority | 0
sync_state | async
reply_time | 2022-02-03 13:31:11.010197+05:30

postgres=# \q
postgres(at)controller:~$ psql -p 5001
psql (14.1 (Ubuntu 14.1-2.pgdg20.04+1))
Type "help" for help.

postgres=# insert into t2(id,col2) select 0,0 from generate_series(1, 100)
x;
INSERT 0 100
postgres=# \q

postgres(at)controller:~$ psql -p 5002
psql (14.1 (Ubuntu 14.1-2.pgdg20.04+1))
Type "help" for help.

postgres=# select count(1) from t2;
count
-------
* 100 -- note this should be 200, but it did not add, so we need to
check for errors*
(1 row)

postgres=# \x
Expanded display is on.
postgres=# select * from pg_stat_subscription;
-[ RECORD 1 ]---------+------
subid | 16392
subname | mysub
*pid | ----no pid, subscription is not active*
relid |
received_lsn |
*last_msg_send_time |*
*last_msg_receipt_time |*
latest_end_lsn |
latest_end_time |

postgres(at)controller:~$ psql -p 5001
psql (14.1 (Ubuntu 14.1-2.pgdg20.04+1))
Type "help" for help.

postgres=# \x
Expanded display is on.
postgres=# select * from pg_stat_replication;
(0 rows)

postgres=# select * from pg_replication_slots;
-[ RECORD 1 ]-------+----------
slot_name | mysub
plugin | pgoutput
slot_type | logical
datoid | 13726
database | postgres
temporary | f
*active | f -- active =f , publication broken, check logs,
(strace would have exited as no process died)*
active_pid |
xmin |
catalog_xmin | 741
restart_lsn | 0/2039BF8
confirmed_flush_lsn | 0/2039C30
wal_status | reserved
safe_wal_size |
two_phase | f

postgres=# \q
postgres(at)controller:~$ tail -100 db1.log
2022-02-03 13:32:06.873 IST [1456] STATEMENT: START_REPLICATION SLOT
"mysub" LOGICAL 0/0 (proto_version '2', publication_names '"mypub"')
2022-02-03 13:32:11.890 IST [1458] LOG: starting logical decoding for slot
"mysub"
2022-02-03 13:32:11.890 IST [1458] DETAIL: Streaming transactions
committing after 0/2039C30, reading WAL from 0/2039BF8.
2022-02-03 13:32:11.890 IST [1458] STATEMENT: START_REPLICATION SLOT
"mysub" LOGICAL 0/0 (proto_version '2', publication_names '"mypub"')
2022-02-03 13:32:11.891 IST [1458] LOG: logical decoding found consistent
point at 0/2039BF8
2022-02-03 13:32:11.891 IST [1458] DETAIL: There are no running
transactions.
2022-02-03 13:32:11.891 IST [1458] STATEMENT: START_REPLICATION SLOT
"mysub" LOGICAL 0/0 (proto_version '2', publication_names '"mypub"')
2022-02-03 13:32:11.892 IST [1458] LOG: could not send data to client:
Broken pipe
2022-02-03 13:33:02.074 IST [1482] CONTEXT: slot "mysub", output plugin
"pgoutput", in the change callback, associated LSN 0/203AAD8
2022-02-03 13:33:02.074 IST [1482] STATEMENT: START_REPLICATION SLOT
"mysub" LOGICAL 0/0 (proto_version '2', publication_names '"mypub"')

postgres(at)controller:~$ tail -100 db2.log
2022-02-03 13:24:42.101 IST [1126] LOG: starting PostgreSQL 14.1 (Ubuntu
14.1-2.pgdg20.04+1) on x86_64-pc-linux-gnu, compiled by gcc (Ubuntu
9.3.0-17ubuntu1~20.04) 9.3.0, 64-bit
2022-02-03 13:24:42.101 IST [1126] LOG: listening on IPv6 address "::1",
port 5002
2022-02-03 13:24:42.101 IST [1126] LOG: listening on IPv4 address
"127.0.0.1", port 5002
2022-02-03 13:24:42.104 IST [1126] LOG: listening on Unix socket
"/var/run/postgresql/.s.PGSQL.5002"
2022-02-03 13:24:42.109 IST [1127] LOG: database system was shut down at
2022-02-03 13:24:25 IST
2022-02-03 13:24:42.112 IST [1126] LOG: database system is ready to accept
connections
2022-02-03 13:27:40.690 IST [1195] LOG: logical replication apply worker
for subscription "mysub" has started
2022-02-03 13:27:40.694 IST [1197] LOG: logical replication table
synchronization worker for subscription "mysub", table "t2" has started
2022-02-03 13:27:40.704 IST [1199] LOG: logical replication table
synchronization worker for subscription "mysub", table "t1" has started
2022-02-03 13:27:40.732 IST [1197] LOG: logical replication table
synchronization worker for subscription "mysub", table "t2" has finished
2022-02-03 13:27:40.746 IST [1199] LOG: logical replication table
synchronization worker for subscription "mysub", table "t1" has finished
2022-02-03 13:31:56.830 IST [1195] ERROR: logical replication target
relation "public.t2" is missing replicated column: "col2"
2022-02-03 13:31:56.831 IST [1126] LOG: background worker "logical
replication worker" (PID 1195) exited with exit code 1

postgres(at)controller:~$ psql -p 5002
psql (14.1 (Ubuntu 14.1-2.pgdg20.04+1))
Type "help" for help.

postgres=# alter table t2 add column col2 int default 0;
ALTER TABLE
postgres=# \q

postgres(at)controller:~$ tail -10 db2.log
2022-02-03 13:33:27.156 IST [1495] LOG: logical replication apply worker
for subscription "mysub" has started
2022-02-03 13:33:27.164 IST [1495] ERROR: logical replication target
relation "public.t2" is missing replicated column: "col2"
2022-02-03 13:33:27.165 IST [1126] LOG: background worker "logical
replication worker" (PID 1495) exited with exit code 1
2022-02-03 13:33:32.175 IST [1497] LOG: logical replication apply worker
for subscription "mysub" has started
2022-02-03 13:33:32.182 IST [1497] ERROR: logical replication target
relation "public.t2" is missing replicated column: "col2"
2022-02-03 13:33:32.183 IST [1126] LOG: background worker "logical
replication worker" (PID 1497) exited with exit code 1
2022-02-03 13:33:37.194 IST [1502] LOG: logical replication apply worker
for subscription "mysub" has started
2022-02-03 13:33:37.201 IST [1502] ERROR: logical replication target
relation "public.t2" is missing replicated column: "col2"
2022-02-03 13:33:37.202 IST [1126] LOG: background worker "logical
replication worker" (PID 1502) exited with exit code 1
2022-02-03 13:33:42.212 IST [1504] LOG: logical replication apply worker
for subscription "mysub" has started
postgres(at)controller:~$ psql -p 5002
psql (14.1 (Ubuntu 14.1-2.pgdg20.04+1))
Type "help" for help.

postgres=# \x
Expanded display is on.
postgres=# select * from pg_subscription_rel;
-[ RECORD 1 ]---------
srsubid | 16392
srrelid | 16389
srsubstate | r
srsublsn | 0/20308F0
-[ RECORD 2 ]---------
srsubid | 16392
srrelid | 16384
srsubstate | r
srsublsn | 0/2030928

postgres=# select * from pg_stat_subscription;
-[ RECORD 1 ]---------+---------------------------------
subid | 16392
subname | mysub
*pid | 1504*
relid |
received_lsn | 0/2040490
last_msg_send_time | 2022-02-03 13:33:42.235423+05:30
last_msg_receipt_time | 2022-02-03 13:33:42.235432+05:30
latest_end_lsn | 0/2040490
latest_end_time | 2022-02-03 13:33:42.235423+05:30

postgres=# \q
postgres(at)controller:~$ psql -p 5002
psql (14.1 (Ubuntu 14.1-2.pgdg20.04+1))
Type "help" for help.

postgres=# select count(1) from t2;
count
-------
200
(1 row)

postgres=# \q
postgres(at)controller:~$ ps aux | grep postgres: *# we need to strace the
below new pids/process , as old ones were terminated*

postgres 1504 0.0 0.7 114904 14972 ? Ss 13:33 0:00
postgres: logical replication worker for subscription 16392
postgres 1505 0.0 0.8 114864 15952 ? Ss 13:33 0:00
postgres: walsender postgres [local] START_REPLICATION
postgres 1513 0.0 0.0 8160 672 pts/0 S+ 13:34 0:00 grep
postgres:

*# check everything should be working.*
postgres(at)controller:~$ psql -p 5001
psql (14.1 (Ubuntu 14.1-2.pgdg20.04+1))
Type "help" for help.

postgres=# delete from t2; -- as said earlier, delete would fail, needs a
fix
ERROR: cannot delete from table "t2" because it does not have a replica
identity and publishes deletes
HINT: To enable deleting from the table, set REPLICA IDENTITY using ALTER
TABLE.
postgres=# delete from t1;
DELETE 100
postgres=# alter table t2 replica identity full; -- since no primary key
or unique key
ALTER TABLE
postgres=# delete from t2;
DELETE 200
postgres=# select count(1) from t2;
count
-------
0
(1 row)

postgres=# select count(1) from t1;
count
-------
0
(1 row)

postgres=# \q
postgres(at)controller:~$ psql -p 5002
psql (14.1 (Ubuntu 14.1-2.pgdg20.04+1))
Type "help" for help.

postgres=# select count(1) from t1;
count
-------
0
(1 row)

postgres=# select count(1) from t2;
count
-------
0
(1 row)

*so in short,*
*share your*

from publisher
select * from pg_replication_slots;
select * from pg_stat_replication;
select * from pg_stat_activity;
ps aux | grep postgres: # ( publisher)

from subscriber
select * from pg_stat_subscription;
select * from pg_subscription_rel;
ps aux | grep postgres: # ( subscriber)

*and logs of publisher and subscriber db (for errors, it there are no pids
for above, logs should show errors)*

In response to

Responses

Browse pgsql-general by date

  From Date Subject
Next Message Simon Riggs 2022-02-03 09:04:39 Re: Undetected Deadlock
Previous Message Abhishek Bhola 2022-02-03 07:14:11 Re: Subscription stuck at initialize state