From 9e90e1a41cb1cbeaa99528a9b7be75f9bf9294c4 Mon Sep 17 00:00:00 2001 From: Kirill Bychik Date: Tue, 17 Mar 2020 14:41:50 +0100 Subject: [PATCH v7 1/4] Add infrastructure to track WAL usage. Author: Kirill Bychik, Julien Rouhaud Reviewed-by: Fuji Masao Discussion: https://postgr.es/m/CAB-hujrP8ZfUkvL5OYETipQwA=e3n7oqHFU=4ZLxWS_Cza3kQQ@mail.gmail.com --- src/backend/access/transam/xlog.c | 11 ++++++ src/backend/access/transam/xloginsert.c | 1 + src/backend/executor/execParallel.c | 38 ++++++++++++++----- src/backend/executor/instrument.c | 50 ++++++++++++++++++++++--- src/include/executor/execParallel.h | 1 + src/include/executor/instrument.h | 19 +++++++++- src/tools/pgindent/typedefs.list | 1 + 7 files changed, 103 insertions(+), 18 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 8fe92962b0..f270b4a0e5 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -42,6 +42,7 @@ #include "commands/progress.h" #include "commands/tablespace.h" #include "common/controldata_utils.h" +#include "executor/instrument.h" #include "miscadmin.h" #include "pg_trace.h" #include "pgstat.h" @@ -1249,6 +1250,16 @@ XLogInsertRecord(XLogRecData *rdata, ProcLastRecPtr = StartPos; XactLastRecEnd = EndPos; + /* Provide WAL update data to the instrumentation */ + if (inserted) + { + pgWalUsage.wal_bytes += rechdr->xl_tot_len; + if (doPageWrites && fpw_lsn <= RedoRecPtr) + pgWalUsage.wal_fpw_records++; + else + pgWalUsage.wal_records++; + } + return EndPos; } diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c index a618dec776..bb2290d636 100644 --- a/src/backend/access/transam/xloginsert.c +++ b/src/backend/access/transam/xloginsert.c @@ -25,6 +25,7 @@ #include "access/xloginsert.h" #include "catalog/pg_control.h" #include "common/pg_lzcompress.h" +#include "executor/instrument.h" #include "miscadmin.h" #include "pg_trace.h" #include "replication/origin.h" diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index a753d6efa0..7d9ca66fc8 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -12,7 +12,7 @@ * workers and ensuring that their state generally matches that of the * leader; see src/backend/access/transam/README.parallel for details. * However, we must save and restore relevant executor state, such as - * any ParamListInfo associated with the query, buffer usage info, and + * any ParamListInfo associated with the query, buffer/WAL usage info, and * the actual plan to be passed down to the worker. * * IDENTIFICATION @@ -62,6 +62,7 @@ #define PARALLEL_KEY_DSA UINT64CONST(0xE000000000000007) #define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000008) #define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009) +#define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xE00000000000000A) #define PARALLEL_TUPLE_QUEUE_SIZE 65536 @@ -573,6 +574,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, char *pstmt_space; char *paramlistinfo_space; BufferUsage *bufusage_space; + WalUsage *walusage_space; SharedExecutorInstrumentation *instrumentation = NULL; SharedJitInstrumentation *jit_instrumentation = NULL; int pstmt_len; @@ -646,6 +648,13 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, mul_size(sizeof(BufferUsage), pcxt->nworkers)); shm_toc_estimate_keys(&pcxt->estimator, 1); + /* + * Same thing for WalUsage. + */ + shm_toc_estimate_chunk(&pcxt->estimator, + mul_size(sizeof(WalUsage), pcxt->nworkers)); + shm_toc_estimate_keys(&pcxt->estimator, 1); + /* Estimate space for tuple queues. */ shm_toc_estimate_chunk(&pcxt->estimator, mul_size(PARALLEL_TUPLE_QUEUE_SIZE, pcxt->nworkers)); @@ -728,6 +737,12 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufusage_space); pei->buffer_usage = bufusage_space; + /* Same for WalUsage. */ + walusage_space = shm_toc_allocate(pcxt->toc, + mul_size(sizeof(WalUsage), pcxt->nworkers)); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage_space); + pei->wal_usage = walusage_space; + /* Set up the tuple queues that the workers will write into. */ pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false); @@ -1069,7 +1084,7 @@ ExecParallelRetrieveJitInstrumentation(PlanState *planstate, /* * Finish parallel execution. We wait for parallel workers to finish, and - * accumulate their buffer usage. + * accumulate their buffer/WAL usage. */ void ExecParallelFinish(ParallelExecutorInfo *pei) @@ -1109,11 +1124,11 @@ ExecParallelFinish(ParallelExecutorInfo *pei) WaitForParallelWorkersToFinish(pei->pcxt); /* - * Next, accumulate buffer usage. (This must wait for the workers to - * finish, or we might get incomplete data.) + * Next, accumulate buffer/WAL usage. (This must wait for the workers + * to finish, or we might get incomplete data.) */ for (i = 0; i < nworkers; i++) - InstrAccumParallelQuery(&pei->buffer_usage[i]); + InstrAccumParallelQuery(&pei->buffer_usage[i], &pei->wal_usage[i]); pei->finished = true; } @@ -1333,6 +1348,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) { FixedParallelExecutorState *fpes; BufferUsage *buffer_usage; + WalUsage *wal_usage; DestReceiver *receiver; QueryDesc *queryDesc; SharedExecutorInstrumentation *instrumentation; @@ -1386,11 +1402,11 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) ExecSetTupleBound(fpes->tuples_needed, queryDesc->planstate); /* - * Prepare to track buffer usage during query execution. + * Prepare to track buffer/WAL usage during query execution. * * We do this after starting up the executor to match what happens in the - * leader, which also doesn't count buffer accesses that occur during - * executor startup. + * leader, which also doesn't count buffer accesses and WAL activity that + * occur during executor startup. */ InstrStartParallelQuery(); @@ -1406,9 +1422,11 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) /* Shut down the executor */ ExecutorFinish(queryDesc); - /* Report buffer usage during parallel execution. */ + /* Report buffer/WAL usage during parallel execution. */ buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false); - InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber]); + wal_usage = shm_toc_lookup (toc, PARALLEL_KEY_WAL_USAGE, false); + InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber], + &wal_usage[ParallelWorkerNumber]); /* Report instrumentation data if any instrumentation options are set. */ if (instrumentation != NULL) diff --git a/src/backend/executor/instrument.c b/src/backend/executor/instrument.c index bc1d42bf64..d2515b0a4c 100644 --- a/src/backend/executor/instrument.c +++ b/src/backend/executor/instrument.c @@ -24,6 +24,10 @@ static void BufferUsageAdd(BufferUsage *dst, const BufferUsage *add); static void BufferUsageAccumDiff(BufferUsage *dst, const BufferUsage *add, const BufferUsage *sub); +WalUsage pgWalUsage; +static WalUsage save_pgWalUsage; + +static void WalUsageAdd(WalUsage *dst, WalUsage *add); /* Allocate new instrumentation structure(s) */ Instrumentation * @@ -33,15 +37,17 @@ InstrAlloc(int n, int instrument_options) /* initialize all fields to zeroes, then modify as needed */ instr = palloc0(n * sizeof(Instrumentation)); - if (instrument_options & (INSTRUMENT_BUFFERS | INSTRUMENT_TIMER)) + if (instrument_options & (INSTRUMENT_BUFFERS | INSTRUMENT_TIMER | INSTRUMENT_WAL)) { bool need_buffers = (instrument_options & INSTRUMENT_BUFFERS) != 0; + bool need_wal = (instrument_options & INSTRUMENT_WAL) != 0; bool need_timer = (instrument_options & INSTRUMENT_TIMER) != 0; int i; for (i = 0; i < n; i++) { instr[i].need_bufusage = need_buffers; + instr[i].need_walusage = need_wal; instr[i].need_timer = need_timer; } } @@ -55,6 +61,7 @@ InstrInit(Instrumentation *instr, int instrument_options) { memset(instr, 0, sizeof(Instrumentation)); instr->need_bufusage = (instrument_options & INSTRUMENT_BUFFERS) != 0; + instr->need_walusage = (instrument_options & INSTRUMENT_WAL) != 0; instr->need_timer = (instrument_options & INSTRUMENT_TIMER) != 0; } @@ -69,6 +76,9 @@ InstrStartNode(Instrumentation *instr) /* save buffer usage totals at node entry, if needed */ if (instr->need_bufusage) instr->bufusage_start = pgBufferUsage; + + if (instr->need_walusage) + instr->walusage_start = pgWalUsage; } /* Exit from a plan node */ @@ -97,6 +107,10 @@ InstrStopNode(Instrumentation *instr, double nTuples) BufferUsageAccumDiff(&instr->bufusage, &pgBufferUsage, &instr->bufusage_start); + if (instr->need_walusage) + WalUsageAccumDiff(&instr->walusage, + &pgWalUsage, &instr->walusage_start); + /* Is this the first tuple of this cycle? */ if (!instr->running) { @@ -160,6 +174,9 @@ InstrAggNode(Instrumentation *dst, Instrumentation *add) /* Add delta of buffer usage since entry to node's totals */ if (dst->need_bufusage) BufferUsageAdd(&dst->bufusage, &add->bufusage); + + if (dst->need_walusage) + WalUsageAdd(&dst->walusage, &add->walusage); } /* note current values during parallel executor startup */ @@ -167,21 +184,25 @@ void InstrStartParallelQuery(void) { save_pgBufferUsage = pgBufferUsage; + save_pgWalUsage = pgWalUsage; } /* report usage after parallel executor shutdown */ void -InstrEndParallelQuery(BufferUsage *result) +InstrEndParallelQuery(BufferUsage *bufusage, WalUsage *walusage) { - memset(result, 0, sizeof(BufferUsage)); - BufferUsageAccumDiff(result, &pgBufferUsage, &save_pgBufferUsage); + memset(bufusage, 0, sizeof(BufferUsage)); + memset(walusage, 0, sizeof(WalUsage)); + BufferUsageAccumDiff(bufusage, &pgBufferUsage, &save_pgBufferUsage); + WalUsageAccumDiff(walusage, &pgWalUsage, &save_pgWalUsage); } /* accumulate work done by workers in leader's stats */ void -InstrAccumParallelQuery(BufferUsage *result) +InstrAccumParallelQuery(BufferUsage *bufusage, WalUsage *walusage) { - BufferUsageAdd(&pgBufferUsage, result); + BufferUsageAdd(&pgBufferUsage, bufusage); + WalUsageAdd(&pgWalUsage, walusage); } /* dst += add */ @@ -223,3 +244,20 @@ BufferUsageAccumDiff(BufferUsage *dst, INSTR_TIME_ACCUM_DIFF(dst->blk_write_time, add->blk_write_time, sub->blk_write_time); } + +/* helper functions for WAL usage accumulation */ +static void +WalUsageAdd(WalUsage *dst, WalUsage *add) +{ + dst->wal_bytes += add->wal_bytes; + dst->wal_records += add->wal_records; + dst->wal_fpw_records += add->wal_fpw_records; +} + +void +WalUsageAccumDiff(WalUsage *dst, const WalUsage *add, const WalUsage *sub) +{ + dst->wal_bytes += add->wal_bytes - sub->wal_bytes; + dst->wal_records += add->wal_records - sub->wal_records; + dst->wal_fpw_records += add->wal_fpw_records - sub->wal_fpw_records; +} diff --git a/src/include/executor/execParallel.h b/src/include/executor/execParallel.h index 17d07cf020..1cc5b524fd 100644 --- a/src/include/executor/execParallel.h +++ b/src/include/executor/execParallel.h @@ -26,6 +26,7 @@ typedef struct ParallelExecutorInfo PlanState *planstate; /* plan subtree we're running in parallel */ ParallelContext *pcxt; /* parallel context we're using */ BufferUsage *buffer_usage; /* points to bufusage area in DSM */ + WalUsage *wal_usage; /* walusage area in DSM */ SharedExecutorInstrumentation *instrumentation; /* optional */ struct SharedJitInstrumentation *jit_instrumentation; /* optional */ dsa_area *area; /* points to DSA area in DSM */ diff --git a/src/include/executor/instrument.h b/src/include/executor/instrument.h index f48d46aede..23264dd396 100644 --- a/src/include/executor/instrument.h +++ b/src/include/executor/instrument.h @@ -32,12 +32,21 @@ typedef struct BufferUsage instr_time blk_write_time; /* time spent writing */ } BufferUsage; +typedef struct WalUsage +{ + long wal_records; /* # of WAL records produced */ + long wal_fpw_records; /* # of full page write WAL records + * produced */ + uint64 wal_bytes; /* size of WAL records produced */ +} WalUsage; + /* Flag bits included in InstrAlloc's instrument_options bitmask */ typedef enum InstrumentOption { INSTRUMENT_TIMER = 1 << 0, /* needs timer (and row counts) */ INSTRUMENT_BUFFERS = 1 << 1, /* needs buffer usage */ INSTRUMENT_ROWS = 1 << 2, /* needs row count */ + INSTRUMENT_WAL = 1 << 3, /* needs WAL usage */ INSTRUMENT_ALL = PG_INT32_MAX } InstrumentOption; @@ -46,6 +55,7 @@ typedef struct Instrumentation /* Parameters set at node creation: */ bool need_timer; /* true if we need timer data */ bool need_bufusage; /* true if we need buffer usage data */ + bool need_walusage; /* true if we need WAL usage data */ /* Info about current plan cycle: */ bool running; /* true if we've completed first tuple */ instr_time starttime; /* start time of current iteration of node */ @@ -53,6 +63,7 @@ typedef struct Instrumentation double firsttuple; /* time for first tuple of this cycle */ double tuplecount; /* # of tuples emitted so far this cycle */ BufferUsage bufusage_start; /* buffer usage at start */ + WalUsage walusage_start; /* WAL usage at start */ /* Accumulated statistics across all completed cycles: */ double startup; /* total startup time (in seconds) */ double total; /* total time (in seconds) */ @@ -62,6 +73,7 @@ typedef struct Instrumentation double nfiltered1; /* # of tuples removed by scanqual or joinqual */ double nfiltered2; /* # of tuples removed by "other" quals */ BufferUsage bufusage; /* total buffer usage */ + WalUsage walusage; /* total WAL usage */ } Instrumentation; typedef struct WorkerInstrumentation @@ -71,6 +83,7 @@ typedef struct WorkerInstrumentation } WorkerInstrumentation; extern PGDLLIMPORT BufferUsage pgBufferUsage; +extern PGDLLIMPORT WalUsage pgWalUsage; extern Instrumentation *InstrAlloc(int n, int instrument_options); extern void InstrInit(Instrumentation *instr, int instrument_options); @@ -79,7 +92,9 @@ extern void InstrStopNode(Instrumentation *instr, double nTuples); extern void InstrEndLoop(Instrumentation *instr); extern void InstrAggNode(Instrumentation *dst, Instrumentation *add); extern void InstrStartParallelQuery(void); -extern void InstrEndParallelQuery(BufferUsage *result); -extern void InstrAccumParallelQuery(BufferUsage *result); +extern void InstrEndParallelQuery(BufferUsage *bufusage, WalUsage *walusage); +extern void InstrAccumParallelQuery(BufferUsage *bufusage, WalUsage *walusage); +extern void WalUsageAccumDiff(WalUsage *dst, const WalUsage *add, + const WalUsage *sub); #endif /* INSTRUMENT_H */ diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index ca2d9ec8fb..1eef542f06 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2635,6 +2635,7 @@ WalSndCtlData WalSndSendDataCallback WalSndState WalTimeSample +WalUsage WalWriteMethod Walfile WindowAgg -- 2.20.1