Can portals interleave with other portals

From: Evgeny Smirnov <evgeny(dot)v(dot)smirnov(at)gmail(dot)com>
To: pgsql-sql(at)lists(dot)postgresql(dot)org
Subject: Can portals interleave with other portals
Date: 2024-03-13 04:38:09
Message-ID: CAB9opTGs-Aa9aQf9kM0xpZHOcGVAeR1rLLkNbH8A-HKD0WJ54w@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-sql

Greetings!

A Postgresql backend is capable of operating multiple portals within a
transaction and switching between them on and off. For instance such a code
results in an expected outcome:

```

*// The table *users_Fetch *contains users with ids between 1 and 20*

val fetchedIds = Flux.defer *{*

Flux.usingWhen(

// 1. Establish cursors

databaseClient.sql(

"DECLARE fetch_test1 SCROLL CURSOR FOR SELECT * FROM
users_Fetch;" +

"DECLARE fetch_test2 SCROLL CURSOR FOR SELECT * FROM
users_Fetch;")

.flatMap *{ *_ *-> *Mono.just(databaseClient) *}*,

// 2. Iterate cursors

*{*

*// concat subscribes its internals sequentially*

Flux.concat(

databaseClient.sql("MOVE FORWARD 3 FROM fetch_test1; FETCH
FORWARD 5 FROM fetch_test1;")

.flatMap *{ *r *-> *r.map *{ *row, _ *-> *row.get("userId")
as Int *} }*,

databaseClient.sql("FETCH FORWARD 5 FROM fetch_test2;")

.flatMap *{ *r *-> *r.map *{ *row, _ *-> *row.get("userId")
as Int *} }*,

// An extended query select in between

databaseClient.sql("select userId from users_Fetch;")

.flatMap *{ *r *-> *r.map *{ *row, _ *-> *row.get("userId")
as Int *} }*,

databaseClient.sql("FETCH BACKWARD 5 FROM fetch_test1;")

.flatMap *{ *r *-> *r.map *{ *row, _ *-> *row.get("userId")
as Int *} }*,

databaseClient.sql("FETCH FORWARD 5 FROM fetch_test2;")

.flatMap *{ *r *-> *r.map *{ *row, _ *-> *row.get("userId")
as Int *} }*

)

*}*,

// 3. Close cursors

*{*

databaseClient.sql(

"CLOSE fetch_test1;" +

"CLOSE fetch_test2;").then()

*}*

)

*}*.`as`(transactionalOperatorFetch::transactional)

assertEquals(

*listOf*(

4, 5, 6, 7, 8, *// MOVE FORWARD 3 FROM fetch_test1; FETCH FORWARD 5
FROM fetch_test1;*

1, 2, 3, 4, 5, *// FETCH FORWARD 5 FROM fetch_test2;*

1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19,
20, *// select userId from users_Fetch;*

7, 6, 5, 4, 3, *// FETCH BACKWARD 5 FROM fetch_test1;*

6, 7, 8, 9, 10), *// FETCH FORWARD 5 FROM fetch_test2;*

fetchedIds.collectList().block()

)

```

Is the same possible for conventional selects issued with extended query
protocol? From the protocol perspective it would result in the following
traffic:

231 53111 5432 PGSQL 109 >Q ———> BEGIN

232 5432 53111 TCP 56 5432 → 53111 [ACK] Seq=1 Ack=54 Win=6371 Len=0
TSval=2819351776 TSecr=589492423

237 5432 53111 PGSQL 73 <C/Z

238 53111 5432 TCP 56 53111 → 5432 [ACK] Seq=54 Ack=18 Win=6366 Len=0
TSval=589492435 TSecr=2819351788

// A client issues a select

239 53111 5432 PGSQL 276 >P/B/D/E/H ———> select * from …; bind B_1; execute
B_1, fetch 2 rows; flush

240 5432 53111 TCP 56 5432 → 53111 [ACK] Seq=18 Ack=274 Win=6368 Len=0
TSval=2819351793 TSecr=589492440

245 5432 53111 PGSQL 552 <1/2/T/D/D/s ———> Data, Data, Portal suspended

// Then the same sequence for another prepared statement and portal (lets
say B_2) but without a limit in the Execute command and sync at the end.

// Then the client proceeds with B_1 till the completion

270 53111 5432 PGSQL 69 > E ———> execute B_1, fetch 2 rows,

271 5432 53111 TCP 56 5432 → 53111 [ACK] Seq=925 Ack=323 Win=6367 Len=0
TSval=2819351846 TSecr=589492493

272 53111 5432 PGSQL 61 >H ———> Flush

274 5432 53111 TCP 56 5432 → 53111 [ACK] Seq=925 Ack=328 Win=6367 Len=0
TSval=2819351846 TSecr=589492493

282 5432 53111 PGSQL 144 <D/C ———> Command completion

283 53111 5432 TCP 56 53111 → 5432 [ACK] Seq=328 Ack=1013 Win=6351 Len=0
TSval=589492496 TSecr=2819351849

284 53111 5432 PGSQL 66 >C ———> Close B_1

285 5432 53111 TCP 56 5432 → 53111 [ACK] Seq=1013 Ack=338 Win=6367 Len=0
TSval=2819351849 TSecr=589492496

286 53111 5432 PGSQL 61 >S ———> Sync

287 5432 53111 TCP 56 5432 → 53111 [ACK] Seq=1013 Ack=343 Win=6366 Len=0
TSval=2819351849 TSecr=589492496

293 5432 53111 PGSQL 67 <3/Z

294 53111 5432 TCP 56 53111 → 5432 [ACK] Seq=343 Ack=1024 Win=6351 Len=0
TSval=589492498 TSecr=2819351851

295 53111 5432 PGSQL 68 >Q ———> COMMIT

I’m interested because such a communication is intrinsic to r2dbc scenarios
like this

```

val usersWithAccouns = Flux.defer *{*

*// Select all users*

databaseClient.sql("select * from users where userId >= $1 and userId
<= $2")

.bind("$1", 1)

.bind("$2", 255)

.flatMap *{ *r *-> *r.map *{ *row, meta *-> *… *} }*

.flatMap *{ *user *->*

*// For each user select all its accounts*

databaseClient.sql("select login from accounts where userId=$1
limit 1")

.bind("$1", user.id)

.flatMap *{ *r *-> *r.map *{ *row, meta *-> *… *} }*

.reduce …

* }*

*}*.`as`(transactionalOperator::transactional)

```

which results in failure owing to inner requests building up a queue inside
the driver.

Thanks!

Responses

Browse pgsql-sql by date

  From Date Subject
Next Message Tom Lane 2024-03-13 04:55:47 Re: Can portals interleave with other portals
Previous Message Sheryl Prabhu David 2024-03-12 07:30:57 Nested loop behaviour with pg_stat_activity