Re: Parallel Inserts in CREATE TABLE AS

From: Zhihong Yu <zyu(at)yugabyte(dot)com>
To: Bharath Rupireddy <bharath(dot)rupireddyforpostgres(at)gmail(dot)com>
Cc: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, Luc Vlaming <luc(at)swarm64(dot)com>, "Hou, Zhijie" <houzj(dot)fnst(at)cn(dot)fujitsu(dot)com>, PostgreSQL-development <pgsql-hackers(at)postgresql(dot)org>
Subject: Re: Parallel Inserts in CREATE TABLE AS
Date: 2020-12-07 03:27:18
Message-ID: CALNJ-vT-HWeX2+JfGpme0e67U3VRj8GNEfcQJLw82n170y7ZEg@mail.gmail.com
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Hi, Bharath :

+ (void) SetCurrentCommandIdUsedForWorker();
+ myState->output_cid = GetCurrentCommandId(false);

SetCurrentCommandIdUsedForWorker already has void as return type. The
'(void)' is not needed.

+ * rd_createSubid is marked invalid, otherwise, the table is
+ * not allowed to extend by the workers.

nit: to extend by the workers -> to be extended by the workers

For IsParallelInsertInCTASAllowed, logic is inside 'if (IS_CTAS(into))'
block.
You can return false when (!IS_CTAS(into)) - this would save some
indentation for the body.

+ if (rel && rel->relpersistence != RELPERSISTENCE_TEMP)
+ allowed = true;

Similarly, when the above condition doesn't hold, you can return false
directly - reducing the next if condition to 'if (queryDesc)'.

+ if (!(ps && IsA(ps, GatherState) && !ps->ps_ProjInfo &&
+ plannedstmt->parallelModeNeeded &&
+ plannedstmt->planTree &&
+ IsA(plannedstmt->planTree, Gather) &&
+ plannedstmt->planTree->lefttree &&
+ plannedstmt->planTree->lefttree->parallel_aware &&
+ plannedstmt->planTree->lefttree->parallel_safe))

The composite condition is negated. Maybe you can write without negation:

+ return (ps && IsA(ps, GatherState) && !ps->ps_ProjInfo &&
+ plannedstmt->parallelModeNeeded &&
+ plannedstmt->planTree &&
+ IsA(plannedstmt->planTree, Gather) &&
+ plannedstmt->planTree->lefttree &&
+ plannedstmt->planTree->lefttree->parallel_aware &&
+ plannedstmt->planTree->lefttree->parallel_safe)

+ * Write out the number of tuples this worker has inserted. Leader will
use
+ * it to inform to the end client.

'inform to the end client' -> 'inform the end client' (without to)

Cheers

On Sun, Dec 6, 2020 at 4:37 PM Bharath Rupireddy <
bharath(dot)rupireddyforpostgres(at)gmail(dot)com> wrote:

> Thanks Amit for the review comments.
>
> On Sat, Dec 5, 2020 at 4:27 PM Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>
> wrote:
> >
> > > If I'm not wrong, I think currently we have no exec nodes for DDLs.
> > > I'm not sure whether we would like to introduce one for this.
> >
> > Yeah, I am also not in favor of having an executor node for CTAS but
> > OTOH, I also don't like the way you have jammed the relevant
> > information in generic PlanState. How about keeping it in GatherState
> > and initializing it in ExecCreateTableAs() after the executor start.
> > You are already doing special treatment for the Gather node in
> > ExecCreateTableAs (via IsParallelInsertInCTASAllowed) so we can as
> > well initialize the required information in GatherState in
> > ExecCreateTableAs. I think that might help in reducing the special
> > treatment for intoclause at different places.
> >
>
> Done. Added required info to GatherState node. While this reduced the
> changes at many other places, but had to pass the into clause and
> object id to ExecInitParallelPlan() as we do not send GatherState node
> to it. Hope that's okay.
>
> >
> > Few other assorted comments:
> > =========================
> > 1.
> > This looks a bit odd. The function name
> > 'IsParallelInsertInCTASAllowed' suggests that it just checks whether
> > parallelism is allowed but it is internally changing the plan_rows. It
> > might be better to do this separately if the parallelism is allowed.
> >
>
> Changed.
>
> >
> > 2.
> > static void ExecShutdownGatherWorkers(GatherState *node);
> > -
> > +static void ExecParallelInsertInCTAS(GatherState *node);
> >
> > Spurious line removal.
> >
>
> Corrected.
>
> >
> > 3.
> > The comment and code appear a bit misleading as the function seems to
> > shutdown the workers rather than waiting for them to finish. How about
> > using something like below:
> >
> > /*
> > * Next, accumulate buffer and WAL usage. (This must wait for the workers
> > * to finish, or we might get incomplete data.)
> > */
> > if (nworkers > 0)
> > {
> > int i;
> >
> > /* Wait for all vacuum workers to finish */
> > WaitForParallelWorkersToFinish(lps->pcxt);
> >
> > for (i = 0; i < lps->pcxt->nworkers_launched; i++)
> > InstrAccumParallelQuery(&lps->buffer_usage[i], &lps->wal_usage[i]);
> > }
> >
> > This is how it works for parallel vacuum.
> >
>
> Done.
>
> >
> > 4.
> > The above comment doesn't seem to convey what it intends to convey.
> > How about changing it slightly as: "We don't compute the
> > parallel_tuple_cost for CTAS because the number of tuples that are
> > transferred from workers to the gather node is zero as each worker
> > parallelly inserts the tuples that are resulted from its chunk of plan
> > execution. This change may make the parallel plan cheap among all
> > other plans, and influence the planner to consider this parallel
> > plan."
> >
>
> Changed.
>
> >
> > Then, we can also have an Assert for path->path.rows to zero for the
> CTAS case.
> >
>
> We can not have Assert(path->path.rows == 0), because we are not
> changing this parameter upstream in or before the planning phase. We
> are just skipping to take it into account for CTAS. We may have to do
> extra checks over different places in case we have to make planner
> path->path.rows to 0 for CTAS. IMHO, that's not necessary. We can just
> skip taking this value in cost_gather. Thoughts?
>
> >
> > 5.
> > + /* Prallel inserts in CTAS related info is specified below. */
> > + IntoClause *intoclause;
> > + Oid objectid;
> > + DestReceiver *dest;
> > } PlanState;
> >
> > Typo. /Prallel/Parallel
> >
>
> Corrected.
>
> >
> > 6.
> > Currently, it seems the plan look like:
> > Gather (actual time=970.524..972.913 rows=0 loops=1)
> > -> Create t1_test
> > Workers Planned: 2
> > Workers Launched: 2
> > -> Parallel Seq Scan on t1 (actual time=0.028..86.623 rows=333333
> loops=3)
> >
> > I would prefer it to be:
> > Gather (actual time=970.524..972.913 rows=0 loops=1)
> > Workers Planned: 2
> > Workers Launched: 2
> > -> Create t1_test
> > -> Parallel Seq Scan on t1 (actual time=0.028..86.623 rows=333333
> loops=3)
> >
> > This way it looks like the writing part is done below the Gather node
> > and also it will match the Parallel Insert patch of Greg.
> >
>
> Done.
>
> Attaching v7 patch. Please review it further.
>
> With Regards,
> Bharath Rupireddy.
> EnterpriseDB: http://www.enterprisedb.com
>

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Craig Ringer 2020-12-07 03:33:57 RFC: Giving bgworkers walsender-like grace during shutdown (for logical replication)
Previous Message Craig Ringer 2020-12-07 03:05:35 Re: Logical archiving