From ce7a68c951344aa2163fb590ecdf3bb0649b83ae Mon Sep 17 00:00:00 2001 From: "Andrey V. Lepikhov" Date: Fri, 2 Jul 2021 16:20:19 +0300 Subject: [PATCH] Do not take a tuple immediately after finishing of async request if this request was initialized by another append node. --- contrib/postgres_fdw/connection.c | 4 +-- .../postgres_fdw/expected/postgres_fdw.out | 36 +++++++++++++++++++ contrib/postgres_fdw/postgres_fdw.c | 25 ++++++++++--- contrib/postgres_fdw/postgres_fdw.h | 2 +- contrib/postgres_fdw/sql/postgres_fdw.sql | 22 ++++++++++++ 5 files changed, 81 insertions(+), 8 deletions(-) diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index 82aa14a65d..163825f4d7 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -203,7 +203,7 @@ GetConnection(UserMapping *user, bool will_prep_stmt, PgFdwConnState **state) { /* Process a pending asynchronous request if any. */ if (entry->state.pendingAreq) - process_pending_request(entry->state.pendingAreq); + process_pending_request(entry->state.pendingAreq, true); /* Start a new transaction or subtransaction if needed. */ begin_remote_xact(entry); } @@ -686,7 +686,7 @@ pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state) { /* First, process a pending asynchronous request, if any. */ if (state && state->pendingAreq) - process_pending_request(state->pendingAreq); + process_pending_request(state->pendingAreq, true); /* * Submit a query. Since we don't use non-blocking mode, this also can diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index 31b5de91ad..9386453e60 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -10470,7 +10470,43 @@ SELECT * FROM async_pt; -> Seq Scan on async_p3 async_pt_3 (actual rows=0 loops=1) (4 rows) +-- +-- The case when we have the Async Append in the +-- qual of the Async ForeignScan node. +-- +CREATE TABLE test (x int) PARTITION BY HASH (x); +CREATE TABLE test_1 (x int); +CREATE TABLE test_2 (x int); +CREATE FOREIGN TABLE ftest_1 PARTITION OF test + FOR VALUES WITH (modulus 2, remainder 0) + SERVER loopback OPTIONS (table_name 'test_1'); +CREATE FOREIGN TABLE ftest_2 PARTITION OF test + FOR VALUES WITH (modulus 2, remainder 1) + SERVER loopback2 OPTIONS (table_name 'test_2'); +INSERT INTO test (SELECT * FROM generate_series(1,10)); +ANALYZE test,test_1,test_2,ftest_1,ftest_2; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT * FROM test WHERE x NOT IN ( + SELECT avg(x) FROM test WHERE x > 10 +); + QUERY PLAN +---------------------------------------------------------------------------------------- + Append (actual rows=0 loops=1) + -> Async Foreign Scan on ftest_1 test_1 (actual rows=0 loops=1) + Filter: (NOT (hashed SubPlan 1)) + Rows Removed by Filter: 2 + SubPlan 1 + -> Aggregate (actual rows=1 loops=2) + -> Append (actual rows=0 loops=2) + -> Async Foreign Scan on ftest_1 test_4 (actual rows=0 loops=2) + -> Async Foreign Scan on ftest_2 test_5 (actual rows=0 loops=2) + -> Async Foreign Scan on ftest_2 test_2 (actual rows=0 loops=1) + Filter: (NOT (hashed SubPlan 1)) + Rows Removed by Filter: 8 +(12 rows) + -- Clean up +DROP TABLE test CASCADE; DROP TABLE async_pt; DROP TABLE base_tbl1; DROP TABLE base_tbl2; diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index fafbab6b02..1c4ae328d0 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -3696,7 +3696,7 @@ create_cursor(ForeignScanState *node) /* First, process a pending asynchronous request, if any. */ if (fsstate->conn_state->pendingAreq) - process_pending_request(fsstate->conn_state->pendingAreq); + process_pending_request(fsstate->conn_state->pendingAreq, true); /* * Construct array of query parameter values in text format. We do the @@ -4066,7 +4066,7 @@ execute_foreign_modify(EState *estate, /* First, process a pending asynchronous request, if any. */ if (fmstate->conn_state->pendingAreq) - process_pending_request(fmstate->conn_state->pendingAreq); + process_pending_request(fmstate->conn_state->pendingAreq, true); /* * If the existing query was deparsed and prepared for a different number @@ -4509,7 +4509,7 @@ execute_dml_stmt(ForeignScanState *node) /* First, process a pending asynchronous request, if any. */ if (dmstate->conn_state->pendingAreq) - process_pending_request(dmstate->conn_state->pendingAreq); + process_pending_request(dmstate->conn_state->pendingAreq, true); /* * Construct array of query parameter values in text format. @@ -6850,7 +6850,7 @@ postgresForeignAsyncConfigureWait(AsyncRequest *areq) */ if (GetNumRegisteredWaitEvents(set) > 1) return; - process_pending_request(pendingAreq); + process_pending_request(pendingAreq, false); fetch_more_data_begin(areq); } else if (pendingAreq->requestee != areq->requestee) @@ -6995,7 +6995,7 @@ fetch_more_data_begin(AsyncRequest *areq) * Process a pending asynchronous request. */ void -process_pending_request(AsyncRequest *areq) +process_pending_request(AsyncRequest *areq, bool fetch) { ForeignScanState *node = (ForeignScanState *) areq->requestee; PgFdwScanState *fsstate PG_USED_FOR_ASSERTS_ONLY = (PgFdwScanState *) node->fdw_state; @@ -7015,6 +7015,21 @@ process_pending_request(AsyncRequest *areq) fetch_more_data(node); + /* + * If the request are made by another append we will only prepare connection + * for the next query and don't take a tuple immediately. It is needed to + * prevent possible recursion into a qual subplan. + */ + if (!fetch) + { + AppendState *node = (AppendState *) areq->requestor; + + ExecAsyncRequestDone(areq, NULL); + node->as_needrequest = bms_add_member(node->as_needrequest, + areq->request_index); + return; + } + /* We need to send a new query afterwards; don't fetch */ produce_tuple_asynchronously(areq, false); diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h index 9591c0f6c2..02c9c7bfa6 100644 --- a/contrib/postgres_fdw/postgres_fdw.h +++ b/contrib/postgres_fdw/postgres_fdw.h @@ -137,7 +137,7 @@ typedef struct PgFdwConnState /* in postgres_fdw.c */ extern int set_transmission_modes(void); extern void reset_transmission_modes(int nestlevel); -extern void process_pending_request(AsyncRequest *areq); +extern void process_pending_request(AsyncRequest *areq, bool fetch); /* in connection.c */ extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt, diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql index 286dd99573..38caf96cc8 100644 --- a/contrib/postgres_fdw/sql/postgres_fdw.sql +++ b/contrib/postgres_fdw/sql/postgres_fdw.sql @@ -3328,7 +3328,29 @@ DELETE FROM async_p3; EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) SELECT * FROM async_pt; +-- +-- The case when we have the Async Append in the +-- qual of the Async ForeignScan node. +-- +CREATE TABLE test (x int) PARTITION BY HASH (x); +CREATE TABLE test_1 (x int); +CREATE TABLE test_2 (x int); +CREATE FOREIGN TABLE ftest_1 PARTITION OF test + FOR VALUES WITH (modulus 2, remainder 0) + SERVER loopback OPTIONS (table_name 'test_1'); +CREATE FOREIGN TABLE ftest_2 PARTITION OF test + FOR VALUES WITH (modulus 2, remainder 1) + SERVER loopback2 OPTIONS (table_name 'test_2'); +INSERT INTO test (SELECT * FROM generate_series(1,10)); +ANALYZE test,test_1,test_2,ftest_1,ftest_2; + +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT * FROM test WHERE x NOT IN ( + SELECT avg(x) FROM test WHERE x > 10 +); + -- Clean up +DROP TABLE test CASCADE; DROP TABLE async_pt; DROP TABLE base_tbl1; DROP TABLE base_tbl2; -- 2.31.1