From: | Evgeny Smirnov <evgeny(dot)v(dot)smirnov(at)gmail(dot)com> |
---|---|
To: | pgsql-sql(at)lists(dot)postgresql(dot)org |
Subject: | Can portals interleave with other portals |
Date: | 2024-03-13 04:38:09 |
Message-ID: | CAB9opTGs-Aa9aQf9kM0xpZHOcGVAeR1rLLkNbH8A-HKD0WJ54w@mail.gmail.com |
Views: | Raw Message | Whole Thread | Download mbox | Resend email |
Thread: | |
Lists: | pgsql-sql |
Greetings!
A Postgresql backend is capable of operating multiple portals within a
transaction and switching between them on and off. For instance such a code
results in an expected outcome:
```
*// The table *users_Fetch *contains users with ids between 1 and 20*
val fetchedIds = Flux.defer *{*
Flux.usingWhen(
// 1. Establish cursors
databaseClient.sql(
"DECLARE fetch_test1 SCROLL CURSOR FOR SELECT * FROM
users_Fetch;" +
"DECLARE fetch_test2 SCROLL CURSOR FOR SELECT * FROM
users_Fetch;")
.flatMap *{ *_ *-> *Mono.just(databaseClient) *}*,
// 2. Iterate cursors
*{*
*// concat subscribes its internals sequentially*
Flux.concat(
databaseClient.sql("MOVE FORWARD 3 FROM fetch_test1; FETCH
FORWARD 5 FROM fetch_test1;")
.flatMap *{ *r *-> *r.map *{ *row, _ *-> *row.get("userId")
as Int *} }*,
databaseClient.sql("FETCH FORWARD 5 FROM fetch_test2;")
.flatMap *{ *r *-> *r.map *{ *row, _ *-> *row.get("userId")
as Int *} }*,
// An extended query select in between
databaseClient.sql("select userId from users_Fetch;")
.flatMap *{ *r *-> *r.map *{ *row, _ *-> *row.get("userId")
as Int *} }*,
databaseClient.sql("FETCH BACKWARD 5 FROM fetch_test1;")
.flatMap *{ *r *-> *r.map *{ *row, _ *-> *row.get("userId")
as Int *} }*,
databaseClient.sql("FETCH FORWARD 5 FROM fetch_test2;")
.flatMap *{ *r *-> *r.map *{ *row, _ *-> *row.get("userId")
as Int *} }*
)
*}*,
// 3. Close cursors
*{*
databaseClient.sql(
"CLOSE fetch_test1;" +
"CLOSE fetch_test2;").then()
*}*
)
*}*.`as`(transactionalOperatorFetch::transactional)
assertEquals(
*listOf*(
4, 5, 6, 7, 8, *// MOVE FORWARD 3 FROM fetch_test1; FETCH FORWARD 5
FROM fetch_test1;*
1, 2, 3, 4, 5, *// FETCH FORWARD 5 FROM fetch_test2;*
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19,
20, *// select userId from users_Fetch;*
7, 6, 5, 4, 3, *// FETCH BACKWARD 5 FROM fetch_test1;*
6, 7, 8, 9, 10), *// FETCH FORWARD 5 FROM fetch_test2;*
fetchedIds.collectList().block()
)
```
Is the same possible for conventional selects issued with extended query
protocol? From the protocol perspective it would result in the following
traffic:
231 53111 5432 PGSQL 109 >Q ———> BEGIN
232 5432 53111 TCP 56 5432 → 53111 [ACK] Seq=1 Ack=54 Win=6371 Len=0
TSval=2819351776 TSecr=589492423
237 5432 53111 PGSQL 73 <C/Z
238 53111 5432 TCP 56 53111 → 5432 [ACK] Seq=54 Ack=18 Win=6366 Len=0
TSval=589492435 TSecr=2819351788
// A client issues a select
239 53111 5432 PGSQL 276 >P/B/D/E/H ———> select * from …; bind B_1; execute
B_1, fetch 2 rows; flush
240 5432 53111 TCP 56 5432 → 53111 [ACK] Seq=18 Ack=274 Win=6368 Len=0
TSval=2819351793 TSecr=589492440
245 5432 53111 PGSQL 552 <1/2/T/D/D/s ———> Data, Data, Portal suspended
…
// Then the same sequence for another prepared statement and portal (lets
say B_2) but without a limit in the Execute command and sync at the end.
…
// Then the client proceeds with B_1 till the completion
270 53111 5432 PGSQL 69 > E ———> execute B_1, fetch 2 rows,
271 5432 53111 TCP 56 5432 → 53111 [ACK] Seq=925 Ack=323 Win=6367 Len=0
TSval=2819351846 TSecr=589492493
272 53111 5432 PGSQL 61 >H ———> Flush
274 5432 53111 TCP 56 5432 → 53111 [ACK] Seq=925 Ack=328 Win=6367 Len=0
TSval=2819351846 TSecr=589492493
282 5432 53111 PGSQL 144 <D/C ———> Command completion
283 53111 5432 TCP 56 53111 → 5432 [ACK] Seq=328 Ack=1013 Win=6351 Len=0
TSval=589492496 TSecr=2819351849
284 53111 5432 PGSQL 66 >C ———> Close B_1
285 5432 53111 TCP 56 5432 → 53111 [ACK] Seq=1013 Ack=338 Win=6367 Len=0
TSval=2819351849 TSecr=589492496
286 53111 5432 PGSQL 61 >S ———> Sync
287 5432 53111 TCP 56 5432 → 53111 [ACK] Seq=1013 Ack=343 Win=6366 Len=0
TSval=2819351849 TSecr=589492496
293 5432 53111 PGSQL 67 <3/Z
294 53111 5432 TCP 56 53111 → 5432 [ACK] Seq=343 Ack=1024 Win=6351 Len=0
TSval=589492498 TSecr=2819351851
295 53111 5432 PGSQL 68 >Q ———> COMMIT
I’m interested because such a communication is intrinsic to r2dbc scenarios
like this
```
val usersWithAccouns = Flux.defer *{*
*// Select all users*
databaseClient.sql("select * from users where userId >= $1 and userId
<= $2")
.bind("$1", 1)
.bind("$2", 255)
.flatMap *{ *r *-> *r.map *{ *row, meta *-> *… *} }*
.flatMap *{ *user *->*
*// For each user select all its accounts*
databaseClient.sql("select login from accounts where userId=$1
limit 1")
.bind("$1", user.id)
.flatMap *{ *r *-> *r.map *{ *row, meta *-> *… *} }*
.reduce …
* }*
*}*.`as`(transactionalOperator::transactional)
```
which results in failure owing to inner requests building up a queue inside
the driver.
Thanks!
From | Date | Subject | |
---|---|---|---|
Next Message | Tom Lane | 2024-03-13 04:55:47 | Re: Can portals interleave with other portals |
Previous Message | Sheryl Prabhu David | 2024-03-12 07:30:57 | Nested loop behaviour with pg_stat_activity |