From: | vignesh C <vignesh21(at)gmail(dot)com> |
---|---|
To: | Bharath Rupireddy <bharath(dot)rupireddyforpostgres(at)gmail(dot)com> |
Cc: | Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, Ashutosh Sharma <ashu(dot)coek88(at)gmail(dot)com>, Andres Freund <andres(at)anarazel(dot)de>, Robert Haas <robertmhaas(at)gmail(dot)com>, Ants Aasma <ants(at)cybertec(dot)at>, Tomas Vondra <tomas(dot)vondra(at)2ndquadrant(dot)com>, Alastair Turner <minion(at)decodable(dot)me>, Thomas Munro <thomas(dot)munro(at)gmail(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org> |
Subject: | Re: Parallel copy |
Date: | 2020-07-07 05:50:51 |
Message-ID: | CALDaNm3AaXUZLLHjxcEeiFLtDZbrXgSUQMhz2oBeO5_4JCm7yQ@mail.gmail.com |
Views: | Raw Message | Whole Thread | Download mbox | Resend email |
Thread: | |
Lists: | pgsql-hackers |
On Wed, Jun 24, 2020 at 1:41 PM Bharath Rupireddy
<bharath(dot)rupireddyforpostgres(at)gmail(dot)com> wrote:
>
> Along with the review comments addressed
> patch(0006-Parallel-Copy-For-Binary-Format-Files.patch) also attaching
> all other latest series of patches(0001 to 0005) from [1], the order
> of applying patches is from 0001 to 0006.
>
> [1] https://www.postgresql.org/message-id/CALDaNm0H3N9gK7CMheoaXkO99g%3DuAPA93nSZXu0xDarPyPY6sg%40mail.gmail.com
>
Some comments:
+ movebytes = DATA_BLOCK_SIZE - cstate->raw_buf_index;
+
+ cstate->pcdata->curr_data_block->skip_bytes = movebytes;
+
+ data_block = &pcshared_info->data_blocks[block_pos];
+
+ if (movebytes > 0)
+ memmove(&data_block->data[0],
&cstate->pcdata->curr_data_block->data[cstate->raw_buf_index],
+ movebytes);
we can create a local variable and use in place of
cstate->pcdata->curr_data_block.
+ if (cstate->raw_buf_index + sizeof(fld_count) >= (DATA_BLOCK_SIZE - 1))
+ AdjustFieldInfo(cstate, 1);
+
+ memcpy(&fld_count,
&cstate->pcdata->curr_data_block->data[cstate->raw_buf_index],
sizeof(fld_count));
Should this be like below, as the remaining size can fit in current block:
if (cstate->raw_buf_index + sizeof(fld_count) >= DATA_BLOCK_SIZE)
+ if ((cstate->raw_buf_index + sizeof(fld_size)) >= (DATA_BLOCK_SIZE - 1))
+ {
+ AdjustFieldInfo(cstate, 2);
+ *new_block_pos = pcshared_info->cur_block_pos;
+ }
Same like above.
+ movebytes = DATA_BLOCK_SIZE - cstate->raw_buf_index;
+
+ cstate->pcdata->curr_data_block->skip_bytes = movebytes;
+
+ data_block = &pcshared_info->data_blocks[block_pos];
+
+ if (movebytes > 0)
Instead of the above check, we can have an assert check for movebytes.
+ if (mode == 1)
+ {
+ cstate->pcdata->curr_data_block = data_block;
+ cstate->raw_buf_index = 0;
+ }
+ else if(mode == 2)
+ {
+ ParallelCopyDataBlock *prev_data_block = NULL;
+ prev_data_block = cstate->pcdata->curr_data_block;
+ prev_data_block->following_block = block_pos;
+ cstate->pcdata->curr_data_block = data_block;
+
+ if (prev_data_block->curr_blk_completed == false)
+ prev_data_block->curr_blk_completed = true;
+
+ cstate->raw_buf_index = 0;
+ }
This code is common for both, keep in common flow and remove if (mode == 1)
cstate->pcdata->curr_data_block = data_block;
cstate->raw_buf_index = 0;
+#define CHECK_FIELD_COUNT \
+{\
+ if (fld_count == -1) \
+ { \
+ if (IsParallelCopy() && \
+ !IsLeader()) \
+ return true; \
+ else if (IsParallelCopy() && \
+ IsLeader()) \
+ { \
+ if
(cstate->pcdata->curr_data_block->data[cstate->raw_buf_index +
sizeof(fld_count)] != 0) \
+ ereport(ERROR, \
+
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT), \
+ errmsg("received copy
data after EOF marker"))); \
+ return true; \
+ } \
We only copy sizeof(fld_count), Shouldn't we check fld_count !=
cstate->max_fields? Am I missing something here?
+ if ((cstate->raw_buf_index + sizeof(fld_size)) >= (DATA_BLOCK_SIZE - 1))
+ {
+ AdjustFieldInfo(cstate, 2);
+ *new_block_pos = pcshared_info->cur_block_pos;
+ }
+
+ memcpy(&fld_size,
&cstate->pcdata->curr_data_block->data[cstate->raw_buf_index],
sizeof(fld_size));
+
+ cstate->raw_buf_index = cstate->raw_buf_index + sizeof(fld_size);
+
+ fld_size = (int32) pg_ntoh32(fld_size);
+
+ if (fld_size == 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+ errmsg("unexpected EOF in COPY data")));
+
+ if (fld_size < -1)
+ ereport(ERROR,
+ (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+ errmsg("invalid field size")));
+
+ if ((DATA_BLOCK_SIZE - cstate->raw_buf_index) >= fld_size)
+ {
+ cstate->raw_buf_index = cstate->raw_buf_index + fld_size;
+ }
We can keep the check like cstate->raw_buf_index + fld_size < ..., for
better readability and consistency.
+static pg_attribute_always_inline void
+CopyReadBinaryAttributeLeader(CopyState cstate, FmgrInfo *flinfo,
+ Oid typioparam, int32 typmod, uint32 *new_block_pos,
+ int m, ParallelCopyTupleInfo *tuple_start_info_ptr,
+ ParallelCopyTupleInfo *tuple_end_info_ptr, uint32 *line_size)
flinfo, typioparam & typmod is not used, we can remove the parameter.
+static pg_attribute_always_inline void
+CopyReadBinaryAttributeLeader(CopyState cstate, FmgrInfo *flinfo,
+ Oid typioparam, int32 typmod, uint32 *new_block_pos,
+ int m, ParallelCopyTupleInfo *tuple_start_info_ptr,
+ ParallelCopyTupleInfo *tuple_end_info_ptr, uint32 *line_size)
I felt this function need not be an inline function.
+ /* binary format */
+ /* for paralle copy leader, fill in the error
There are some typos, run spell check
+ /* raw_buf_index should never cross data block size,
+ * as the required number of data blocks would have
+ * been obtained in the above while loop.
+ */
There are few places, commenting style should be changed to postgres style
+ if (cstate->pcdata->curr_data_block == NULL)
+ {
+ block_pos = WaitGetFreeCopyBlock(pcshared_info);
+
+ cstate->pcdata->curr_data_block =
&pcshared_info->data_blocks[block_pos];
+
+ cstate->raw_buf_index = 0;
+
+ readbytes = CopyGetData(cstate,
&cstate->pcdata->curr_data_block->data, 1, DATA_BLOCK_SIZE);
+
+ elog(DEBUG1, "LEADER - bytes read from file %d", readbytes);
+
+ if (cstate->reached_eof)
+ return true;
+ }
There are many empty lines, these are not required.
+ if (cstate->raw_buf_index + sizeof(fld_count) >= (DATA_BLOCK_SIZE - 1))
+ AdjustFieldInfo(cstate, 1);
+
+ memcpy(&fld_count,
&cstate->pcdata->curr_data_block->data[cstate->raw_buf_index],
sizeof(fld_count));
+
+ fld_count = (int16) pg_ntoh16(fld_count);
+
+ CHECK_FIELD_COUNT;
+
+ cstate->raw_buf_index = cstate->raw_buf_index + sizeof(fld_count);
+ new_block_pos = pcshared_info->cur_block_pos;
You can run pg_indent once for the changes.
+ if (mode == 1)
+ {
+ cstate->pcdata->curr_data_block = data_block;
+ cstate->raw_buf_index = 0;
+ }
+ else if(mode == 2)
+ {
Could use macros for 1 & 2 for better readability.
+ if (tuple_start_info_ptr->block_id ==
tuple_end_info_ptr->block_id)
+ {
+ elog(DEBUG1,"LEADER - tuple lies in a single
data block");
+
+ *line_size = tuple_end_info_ptr->offset -
tuple_start_info_ptr->offset + 1;
+
pg_atomic_add_fetch_u32(&pcshared_info->data_blocks[tuple_start_info_ptr->block_id].unprocessed_line_parts,
1);
+ }
+ else
+ {
+ uint32 following_block_id =
pcshared_info->data_blocks[tuple_start_info_ptr->block_id].following_block;
+
+ elog(DEBUG1,"LEADER - tuple is spread across
data blocks");
+
+ *line_size = DATA_BLOCK_SIZE -
tuple_start_info_ptr->offset -
+
pcshared_info->data_blocks[tuple_start_info_ptr->block_id].skip_bytes;
+
+
pg_atomic_add_fetch_u32(&pcshared_info->data_blocks[tuple_start_info_ptr->block_id].unprocessed_line_parts,
1);
+
+ while (following_block_id !=
tuple_end_info_ptr->block_id)
+ {
+ *line_size = *line_size +
DATA_BLOCK_SIZE -
pcshared_info->data_blocks[following_block_id].skip_bytes;
+
+
pg_atomic_add_fetch_u32(&pcshared_info->data_blocks[following_block_id].unprocessed_line_parts,
1);
+
+ following_block_id =
pcshared_info->data_blocks[following_block_id].following_block;
+
+ if (following_block_id == -1)
+ break;
+ }
+
+ if (following_block_id != -1)
+
pg_atomic_add_fetch_u32(&pcshared_info->data_blocks[following_block_id].unprocessed_line_parts,
1);
+
+ *line_size = *line_size +
tuple_end_info_ptr->offset + 1;
+ }
We could calculate the size as we parse and identify one record, if we
do that way this can be removed.
Regards,
Vignesh
EnterpriseDB: http://www.enterprisedb.com
From | Date | Subject | |
---|---|---|---|
Next Message | Bharath Rupireddy | 2020-07-07 06:35:31 | Issue with cancel_before_shmem_exit while searching to remove a particular registered exit callbacks |
Previous Message | Michael Paquier | 2020-07-07 05:49:44 | Re: Cache lookup errors with functions manipulation object addresses |