diff --git a/contrib/pg_visibility/pg_visibility.c b/contrib/pg_visibility/pg_visibility.c index 773ba92..ac575a1 100644 --- a/contrib/pg_visibility/pg_visibility.c +++ b/contrib/pg_visibility/pg_visibility.c @@ -41,6 +41,20 @@ typedef struct corrupt_items ItemPointer tids; } corrupt_items; +/* + * Helper struct for read stream object used in + * collect_corrupt_items() function. + */ +struct collect_corrupt_items_read_stream_private +{ + bool all_frozen; + bool all_visible; + BlockNumber blocknum; + BlockNumber nblocks; + Relation rel; + Buffer *vmbuffer; +}; + PG_FUNCTION_INFO_V1(pg_visibility_map); PG_FUNCTION_INFO_V1(pg_visibility_map_rel); PG_FUNCTION_INFO_V1(pg_visibility); @@ -611,6 +625,35 @@ GetStrictOldestNonRemovableTransactionId(Relation rel) } /* + * Callback function to get next block for read stream object used in + * collect_corrupt_items() function. + */ +static BlockNumber +collect_corrupt_items_read_stream_next_block(ReadStream *stream, + void *callback_private_data, + void *per_buffer_data) +{ + struct collect_corrupt_items_read_stream_private *p = callback_private_data; + + for (; p->blocknum < p->nblocks; p->blocknum++) + { + bool check_frozen = false; + bool check_visible = false; + + if (p->all_frozen && VM_ALL_FROZEN(p->rel, p->blocknum, p->vmbuffer)) + check_frozen = true; + if (p->all_visible && VM_ALL_VISIBLE(p->rel, p->blocknum, p->vmbuffer)) + check_visible = true; + if (!check_visible && !check_frozen) + continue; + + return p->blocknum++; + } + + return InvalidBlockNumber; +} + +/* * Returns a list of items whose visibility map information does not match * the status of the tuples on the page. * @@ -634,6 +677,10 @@ collect_corrupt_items(Oid relid, bool all_visible, bool all_frozen) Buffer vmbuffer = InvalidBuffer; BufferAccessStrategy bstrategy = GetAccessStrategy(BAS_BULKREAD); TransactionId OldestXmin = InvalidTransactionId; + struct collect_corrupt_items_read_stream_private p; + ReadStream *stream; + + Assert(all_visible || all_frozen); rel = relation_open(relid, AccessShareLock); @@ -658,11 +705,25 @@ collect_corrupt_items(Oid relid, bool all_visible, bool all_frozen) items->count = 64; items->tids = palloc(items->count * sizeof(ItemPointerData)); + p.blocknum = 0; + p.nblocks = nblocks; + p.rel = rel; + p.vmbuffer = &vmbuffer; + p.all_frozen = all_frozen; + p.all_visible = all_visible; + stream = read_stream_begin_relation(READ_STREAM_FULL, + bstrategy, + rel, + MAIN_FORKNUM, + collect_corrupt_items_read_stream_next_block, + &p, + 0); + /* Loop over every block in the relation. */ for (blkno = 0; blkno < nblocks; ++blkno) { - bool check_frozen = false; - bool check_visible = false; + bool check_frozen = all_frozen; + bool check_visible = all_visible; Buffer buffer; Page page; OffsetNumber offnum, @@ -671,17 +732,20 @@ collect_corrupt_items(Oid relid, bool all_visible, bool all_frozen) /* Make sure we are interruptible. */ CHECK_FOR_INTERRUPTS(); - /* Use the visibility map to decide whether to check this page. */ - if (all_frozen && VM_ALL_FROZEN(rel, blkno, &vmbuffer)) - check_frozen = true; - if (all_visible && VM_ALL_VISIBLE(rel, blkno, &vmbuffer)) - check_visible = true; - if (!check_visible && !check_frozen) - continue; - /* Read and lock the page. */ - buffer = ReadBufferExtended(rel, MAIN_FORKNUM, blkno, RBM_NORMAL, - bstrategy); + buffer = read_stream_next_buffer(stream, NULL); + + /* + * If the read stream returns an InvalidBuffer, this means all the + * blocks are processed. So, end the stream and loop. + */ + if (buffer == InvalidBuffer) + { + read_stream_end(stream); + stream = NULL; + break; + } + LockBuffer(buffer, BUFFER_LOCK_SHARE); page = BufferGetPage(buffer); @@ -778,6 +842,11 @@ collect_corrupt_items(Oid relid, bool all_visible, bool all_frozen) UnlockReleaseBuffer(buffer); } + if (stream) + { + Assert(read_stream_next_buffer(stream, NULL) == InvalidBuffer); + read_stream_end(stream); + } /* Clean up. */ if (vmbuffer != InvalidBuffer)