Re: Parallel copy

From: Andres Freund <andres(at)anarazel(dot)de>
To: vignesh C <vignesh21(at)gmail(dot)com>
Cc: Amit Kapila <amit(dot)kapila16(at)gmail(dot)com>, Robert Haas <robertmhaas(at)gmail(dot)com>, Ants Aasma <ants(at)cybertec(dot)at>, Tomas Vondra <tomas(dot)vondra(at)2ndquadrant(dot)com>, Alastair Turner <minion(at)decodable(dot)me>, Thomas Munro <thomas(dot)munro(at)gmail(dot)com>, PostgreSQL Hackers <pgsql-hackers(at)lists(dot)postgresql(dot)org>
Subject: Re: Parallel copy
Date: 2020-06-03 19:14:48
Message-ID: 20200603191448.ee6ypzbz2zvpvizi@alap3.anarazel.de
Views: Raw Message | Whole Thread | Download mbox | Resend email
Thread:
Lists: pgsql-hackers

Hi,

On 2020-06-03 15:53:24 +0530, vignesh C wrote:
> Workers/
> Exec time (seconds) copy from file,
> 2 indexes on integer columns
> 1 index on text column copy from stdin,
> 2 indexes on integer columns
> 1 index on text column copy from file, 1 gist index on text column copy
> from file,
> 3 indexes on integer columns copy from stdin, 3 indexes on integer columns
> 0 1162.772(1X) 1176.035(1X) 827.669(1X) 216.171(1X) 217.376(1X)
> 1 1110.288(1.05X) 1120.556(1.05X) 747.384(1.11X) 174.242(1.24X) 163.492(1.33X)
> 2 635.249(1.83X) 668.18(1.76X) 435.673(1.9X) 133.829(1.61X) 126.516(1.72X)
> 4 336.835(3.45X) 346.768(3.39X) 236.406(3.5X) 105.767(2.04X) 107.382(2.02X)
> 8 188.577(6.17X) 194.491(6.04X) 148.962(5.56X) 100.708(2.15X) 107.72(2.01X)
> 16 126.819(9.17X) 146.402(8.03X) 119.923(6.9X) 97.996(2.2X) 106.531(2.04X)
> 20 *117.845(9.87X)* 149.203(7.88X) 138.741(5.96X) 97.94(2.21X) 107.5(2.02)
> 30 127.554(9.11X) 161.218(7.29X) 172.443(4.8X) 98.232(2.2X) 108.778(1.99X)

Hm. you don't explicitly mention that in your design, but given how
small the benefits going from 0-1 workers is, I assume the leader
doesn't do any "chunk processing" on its own?

> Design of the Parallel Copy: The backend, to which the "COPY FROM" query is
> submitted acts as leader with the responsibility of reading data from the
> file/stdin, launching at most n number of workers as specified with
> PARALLEL 'n' option in the "COPY FROM" query. The leader populates the
> common data required for the workers execution in the DSM and shares it
> with the workers. The leader then executes before statement triggers if
> there exists any. Leader populates DSM chunks which includes the start
> offset and chunk size, while populating the chunks it reads as many blocks
> as required into the DSM data blocks from the file. Each block is of 64K
> size. The leader parses the data to identify a chunk, the existing logic
> from CopyReadLineText which identifies the chunks with some changes was
> used for this. Leader checks if a free chunk is available to copy the
> information, if there is no free chunk it waits till the required chunk is
> freed up by the worker and then copies the identified chunks information
> (offset & chunk size) into the DSM chunks. This process is repeated till
> the complete file is processed. Simultaneously, the workers cache the
> chunks(50) locally into the local memory and release the chunks to the
> leader for further populating. Each worker processes the chunk that it
> cached and inserts it into the table. The leader waits till all the chunks
> populated are processed by the workers and exits.

Why do we need the local copy of 50 chunks? Copying memory around is far
from free. I don't see why it'd be better to add per-process caching,
rather than making the DSM bigger? I can see some benefit in marking
multiple chunks as being processed with one lock acquisition, but I
don't think adding a memory copy is a good idea.

This patch *desperately* needs to be split up. It imo is close to
unreviewable, due to a large amount of changes that just move code
around without other functional changes being mixed in with the actual
new stuff.

> /*
> + * State of the chunk.
> + */
> +typedef enum ChunkState
> +{
> + CHUNK_INIT, /* initial state of chunk */
> + CHUNK_LEADER_POPULATING, /* leader processing chunk */
> + CHUNK_LEADER_POPULATED, /* leader completed populating chunk */
> + CHUNK_WORKER_PROCESSING, /* worker processing chunk */
> + CHUNK_WORKER_PROCESSED /* worker completed processing chunk */
> +}ChunkState;
> +
> +#define RAW_BUF_SIZE 65536 /* we palloc RAW_BUF_SIZE+1 bytes */
> +
> +#define DATA_BLOCK_SIZE RAW_BUF_SIZE
> +#define RINGSIZE (10 * 1000)
> +#define MAX_BLOCKS_COUNT 1000
> +#define WORKER_CHUNK_COUNT 50 /* should be mod of RINGSIZE */
> +
> +#define IsParallelCopy() (cstate->is_parallel)
> +#define IsLeader() (cstate->pcdata->is_leader)
> +#define IsHeaderLine() (cstate->header_line && cstate->cur_lineno == 1)
> +
> +/*
> + * Copy data block information.
> + */
> +typedef struct CopyDataBlock
> +{
> + /* The number of unprocessed chunks in the current block. */
> + pg_atomic_uint32 unprocessed_chunk_parts;
> +
> + /*
> + * If the current chunk data is continued into another block,
> + * following_block will have the position where the remaining data need to
> + * be read.
> + */
> + uint32 following_block;
> +
> + /*
> + * This flag will be set, when the leader finds out this block can be read
> + * safely by the worker. This helps the worker to start processing the chunk
> + * early where the chunk will be spread across many blocks and the worker
> + * need not wait for the complete chunk to be processed.
> + */
> + bool curr_blk_completed;
> + char data[DATA_BLOCK_SIZE + 1]; /* data read from file */
> +}CopyDataBlock;

What's the + 1 here about?

> +/*
> + * Parallel copy line buffer information.
> + */
> +typedef struct ParallelCopyLineBuf
> +{
> + StringInfoData line_buf;
> + uint64 cur_lineno; /* line number for error messages */
> +}ParallelCopyLineBuf;

Why do we need separate infrastructure for this? We shouldn't duplicate
infrastructure unnecessarily.

> +/*
> + * Common information that need to be copied to shared memory.
> + */
> +typedef struct CopyWorkerCommonData
> +{

Why is parallel specific stuff here suddenly not named ParallelCopy*
anymore? If you introduce a naming like that it imo should be used
consistently.

> + /* low-level state data */
> + CopyDest copy_dest; /* type of copy source/destination */
> + int file_encoding; /* file or remote side's character encoding */
> + bool need_transcoding; /* file encoding diff from server? */
> + bool encoding_embeds_ascii; /* ASCII can be non-first byte? */
> +
> + /* parameters from the COPY command */
> + bool csv_mode; /* Comma Separated Value format? */
> + bool header_line; /* CSV header line? */
> + int null_print_len; /* length of same */
> + bool force_quote_all; /* FORCE_QUOTE *? */
> + bool convert_selectively; /* do selective binary conversion? */
> +
> + /* Working state for COPY FROM */
> + AttrNumber num_defaults;
> + Oid relid;
> +}CopyWorkerCommonData;

But I actually think we shouldn't have this information in two different
structs. This should exist once, independent of using parallel /
non-parallel copy.

> +/* List information */
> +typedef struct ListInfo
> +{
> + int count; /* count of attributes */
> +
> + /* string info in the form info followed by info1, info2... infon */
> + char info[1];
> +} ListInfo;

Based on these comments I have no idea what this could be for.

> /*
> - * This keeps the character read at the top of the loop in the buffer
> - * even if there is more than one read-ahead.
> + * This keeps the character read at the top of the loop in the buffer
> + * even if there is more than one read-ahead.
> + */
> +#define IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(extralen) \
> +if (1) \
> +{ \
> + if (copy_buff_state.raw_buf_ptr + (extralen) >= copy_buff_state.copy_buf_len && !hit_eof) \
> + { \
> + if (IsParallelCopy()) \
> + { \
> + copy_buff_state.chunk_size = prev_chunk_size; /* update previous chunk size */ \
> + if (copy_buff_state.block_switched) \
> + { \
> + pg_atomic_sub_fetch_u32(&copy_buff_state.data_blk_ptr->unprocessed_chunk_parts, 1); \
> + copy_buff_state.copy_buf_len = prev_copy_buf_len; \
> + } \
> + } \
> + copy_buff_state.raw_buf_ptr = prev_raw_ptr; /* undo fetch */ \
> + need_data = true; \
> + continue; \
> + } \
> +} else ((void) 0)

I think it's an absolutely clear no-go to add new branches to
these. They're *really* hot already, and this is going to sprinkle a
significant amount of new instructions over a lot of places.

> +/*
> + * SET_RAWBUF_FOR_LOAD - Set raw_buf to the shared memory where the file data must
> + * be read.
> + */
> +#define SET_RAWBUF_FOR_LOAD() \
> +{ \
> + ShmCopyInfo *pcshared_info = cstate->pcdata->pcshared_info; \
> + uint32 cur_block_pos; \
> + /* \
> + * Mark the previous block as completed, worker can start copying this data. \
> + */ \
> + if (copy_buff_state.data_blk_ptr != copy_buff_state.curr_data_blk_ptr && \
> + copy_buff_state.data_blk_ptr->curr_blk_completed == false) \
> + copy_buff_state.data_blk_ptr->curr_blk_completed = true; \
> + \
> + copy_buff_state.data_blk_ptr = copy_buff_state.curr_data_blk_ptr; \
> + cur_block_pos = WaitGetFreeCopyBlock(pcshared_info); \
> + copy_buff_state.curr_data_blk_ptr = &pcshared_info->data_blocks[cur_block_pos]; \
> + \
> + if (!copy_buff_state.data_blk_ptr) \
> + { \
> + copy_buff_state.data_blk_ptr = copy_buff_state.curr_data_blk_ptr; \
> + chunk_first_block = cur_block_pos; \
> + } \
> + else if (need_data == false) \
> + copy_buff_state.data_blk_ptr->following_block = cur_block_pos; \
> + \
> + cstate->raw_buf = copy_buff_state.curr_data_blk_ptr->data; \
> + copy_buff_state.copy_raw_buf = cstate->raw_buf; \
> +}
> +
> +/*
> + * END_CHUNK_PARALLEL_COPY - Update the chunk information in shared memory.
> + */
> +#define END_CHUNK_PARALLEL_COPY() \
> +{ \
> + if (!IsHeaderLine()) \
> + { \
> + ShmCopyInfo *pcshared_info = cstate->pcdata->pcshared_info; \
> + ChunkBoundaries *chunkBoundaryPtr = &pcshared_info->chunk_boundaries; \
> + if (copy_buff_state.chunk_size) \
> + { \
> + ChunkBoundary *chunkInfo = &chunkBoundaryPtr->ring[chunk_pos]; \
> + /* \
> + * If raw_buf_ptr is zero, unprocessed_chunk_parts would have been \
> + * incremented in SEEK_COPY_BUFF_POS. This will happen if the whole \
> + * chunk finishes at the end of the current block. If the \
> + * new_line_size > raw_buf_ptr, then the new block has only new line \
> + * char content. The unprocessed count should not be increased in \
> + * this case. \
> + */ \
> + if (copy_buff_state.raw_buf_ptr != 0 && \
> + copy_buff_state.raw_buf_ptr > new_line_size) \
> + pg_atomic_add_fetch_u32(&copy_buff_state.curr_data_blk_ptr->unprocessed_chunk_parts, 1); \
> + \
> + /* Update chunk size. */ \
> + pg_atomic_write_u32(&chunkInfo->chunk_size, copy_buff_state.chunk_size); \
> + pg_atomic_write_u32(&chunkInfo->chunk_state, CHUNK_LEADER_POPULATED); \
> + elog(DEBUG1, "[Leader] After adding - chunk position:%d, chunk_size:%d", \
> + chunk_pos, copy_buff_state.chunk_size); \
> + pcshared_info->populated++; \
> + } \
> + else if (new_line_size) \
> + { \
> + /* \
> + * This means only new line char, empty record should be \
> + * inserted. \
> + */ \
> + ChunkBoundary *chunkInfo; \
> + chunk_pos = UpdateBlockInChunkInfo(cstate, -1, -1, 0, \
> + CHUNK_LEADER_POPULATED); \
> + chunkInfo = &chunkBoundaryPtr->ring[chunk_pos]; \
> + elog(DEBUG1, "[Leader] Added empty chunk with offset:%d, chunk position:%d, chunk size:%d", \
> + chunkInfo->start_offset, chunk_pos, \
> + pg_atomic_read_u32(&chunkInfo->chunk_size)); \
> + pcshared_info->populated++; \
> + } \
> + }\
> + \
> + /*\
> + * All of the read data is processed, reset index & len. In the\
> + * subsequent read, we will get a new block and copy data in to the\
> + * new block.\
> + */\
> + if (copy_buff_state.raw_buf_ptr == copy_buff_state.copy_buf_len)\
> + {\
> + cstate->raw_buf_index = 0;\
> + cstate->raw_buf_len = 0;\
> + }\
> + else\
> + cstate->raw_buf_len = copy_buff_state.copy_buf_len;\
> +}

Why are these macros? They are way way way above a length where that
makes any sort of sense.

Greetings,

Andres Freund

In response to

Responses

Browse pgsql-hackers by date

  From Date Subject
Next Message Tomas Vondra 2020-06-03 19:31:01 Re: significant slowdown of HashAggregate between 9.6 and 10
Previous Message Tom Lane 2020-06-03 18:45:50 Re: Expand the use of check_canonical_path() for more GUCs