diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index dada2ecd1e..f7946a39fd 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -223,6 +223,25 @@ static const int MultiXactStatusLock[MaxMultiXactStatus + 1] = * ---------------------------------------------------------------- */ +static BlockNumber +heap_scan_stream_read_next(ReadStream *pgsr, void *private_data, + void *per_buffer_data) +{ + HeapScanDesc scan = (HeapScanDesc) private_data; + + if (unlikely(!scan->rs_inited)) + { + scan->rs_prefetch_block = heapgettup_initial_block(scan, scan->rs_dir); + scan->rs_inited = true; + } + else + scan->rs_prefetch_block = heapgettup_advance_block(scan, + scan->rs_prefetch_block, + scan->rs_dir); + + return scan->rs_prefetch_block; +} + /* ---------------- * initscan - scan code common to heap_beginscan and heap_rescan * ---------------- @@ -325,6 +344,13 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock) scan->rs_cbuf = InvalidBuffer; scan->rs_cblock = InvalidBlockNumber; + /* + * Initialize to ForwardScanDirection because it is most common and heap + * scans usually must go forwards before going backward. + */ + scan->rs_dir = ForwardScanDirection; + scan->rs_prefetch_block = InvalidBlockNumber; + /* page-at-a-time fields are always invalid when not rs_inited */ /* @@ -462,12 +488,14 @@ heap_prepare_pagescan(TableScanDesc sscan) /* * heap_fetch_next_buffer - read and pin the next block from MAIN_FORKNUM. * - * Read the next block of the scan relation into a buffer and pin that buffer - * before saving it in the scan descriptor. + * Read the next block of the scan relation from the read stream and pin that + * buffer before saving it in the scan descriptor. */ static inline void heap_fetch_next_buffer(HeapScanDesc scan, ScanDirection dir) { + Assert(scan->rs_read_stream); + /* release previous scan buffer, if any */ if (BufferIsValid(scan->rs_cbuf)) { @@ -482,25 +510,23 @@ heap_fetch_next_buffer(HeapScanDesc scan, ScanDirection dir) */ CHECK_FOR_INTERRUPTS(); - if (unlikely(!scan->rs_inited)) + /* + * If the scan direction is changing, reset the prefetch block to the + * current block. Otherwise, we will incorrectly prefetch the blocks + * between the prefetch block and the current block again before + * prefetching blocks in the new, correct scan direction. + */ + if (unlikely(scan->rs_dir != dir)) { - scan->rs_cblock = heapgettup_initial_block(scan, dir); + scan->rs_prefetch_block = scan->rs_cblock; + read_stream_reset(scan->rs_read_stream); + } - /* ensure rs_cbuf is invalid when we get InvalidBlockNumber */ - Assert(scan->rs_cblock != InvalidBlockNumber || - !BufferIsValid(scan->rs_cbuf)); + scan->rs_dir = dir; - scan->rs_inited = true; - } - else - scan->rs_cblock = heapgettup_advance_block(scan, scan->rs_cblock, - dir); - - /* read block if valid */ - if (BlockNumberIsValid(scan->rs_cblock)) - scan->rs_cbuf = ReadBufferExtended(scan->rs_base.rs_rd, MAIN_FORKNUM, - scan->rs_cblock, RBM_NORMAL, - scan->rs_strategy); + scan->rs_cbuf = read_stream_next_buffer(scan->rs_read_stream, NULL); + if (BufferIsValid(scan->rs_cbuf)) + scan->rs_cblock = BufferGetBlockNumber(scan->rs_cbuf); } /* @@ -833,6 +859,7 @@ continue_page: scan->rs_cbuf = InvalidBuffer; scan->rs_cblock = InvalidBlockNumber; + scan->rs_prefetch_block = InvalidBlockNumber; tuple->t_data = NULL; scan->rs_inited = false; } @@ -928,6 +955,7 @@ continue_page: ReleaseBuffer(scan->rs_cbuf); scan->rs_cbuf = InvalidBuffer; scan->rs_cblock = InvalidBlockNumber; + scan->rs_prefetch_block = InvalidBlockNumber; tuple->t_data = NULL; scan->rs_inited = false; } @@ -1021,6 +1049,28 @@ heap_beginscan(Relation relation, Snapshot snapshot, initscan(scan, key, false); + scan->rs_read_stream = NULL; + + /* + * For sequential scans and TID range scans, we will set up a read stream. + * We do not know the scan direction yet. If the scan does not end up + * being a forward scan, the read stream will be freed. This should be + * done after initscan() because initscan() allocates the + * BufferAccessStrategy object. + */ + if (scan->rs_base.rs_flags & SO_TYPE_SEQSCAN || + scan->rs_base.rs_flags & SO_TYPE_TIDRANGESCAN) + { + scan->rs_read_stream = read_stream_begin_relation(READ_STREAM_SEQUENTIAL, + scan->rs_strategy, + scan->rs_base.rs_rd, + MAIN_FORKNUM, + heap_scan_stream_read_next, + scan, + 0); + } + + return (TableScanDesc) scan; } @@ -1055,6 +1105,14 @@ heap_rescan(TableScanDesc sscan, ScanKey key, bool set_params, if (BufferIsValid(scan->rs_cbuf)) ReleaseBuffer(scan->rs_cbuf); + /* + * The read stream is reset on rescan. This must be done before + * initscan(), as some state referred to by read_stream_reset() is reset + * in initscan(). + */ + if (scan->rs_read_stream) + read_stream_reset(scan->rs_read_stream); + /* * reinitialize scan descriptor */ @@ -1074,6 +1132,12 @@ heap_endscan(TableScanDesc sscan) if (BufferIsValid(scan->rs_cbuf)) ReleaseBuffer(scan->rs_cbuf); + /* + * Must free the read stream before freeing the BufferAccessStrategy. + */ + if (scan->rs_read_stream) + read_stream_end(scan->rs_read_stream); + /* * decrement relation reference count and free scan descriptor storage */ diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index 2765efc4e5..332a7faa8d 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -25,6 +25,7 @@ #include "storage/bufpage.h" #include "storage/dsm.h" #include "storage/lockdefs.h" +#include "storage/read_stream.h" #include "storage/shm_toc.h" #include "utils/relcache.h" #include "utils/snapshot.h" @@ -70,6 +71,20 @@ typedef struct HeapScanDescData HeapTupleData rs_ctup; /* current tuple in scan, if any */ + /* For scans that stream reads */ + ReadStream *rs_read_stream; + + /* + * For sequential scans and TID range scans to stream reads. The read + * stream is allocated at the beginning of the scan and reset on rescan or + * when the scan direction changes. The scan direction is saved each time + * a new page is requested. If the scan direction changes from one page to + * the next, the read stream releases all previously pinned buffers and + * resets the prefetch block. + */ + ScanDirection rs_dir; + BlockNumber rs_prefetch_block; + /* * For parallel scans to store page allocation data. NULL when not * performing a parallel scan.