Skip to content

Commit 9f7123d

Browse files
committed
Add pg_stat_wait extension
1 parent 56dcbf7 commit 9f7123d

15 files changed

+1730
-159
lines changed

Diff for: contrib/pg_stat_wait/Makefile

+54
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
# contrib/pg_stat_wait/Makefile
2+
3+
MODULE_big = pg_stat_wait
4+
OBJS = pg_stat_wait.o collector.o
5+
6+
EXTENSION = pg_stat_wait
7+
DATA = pg_stat_wait--1.0.sql
8+
PG_CPPFLAGS = -DPG_STAT_WAIT_TESTS
9+
EXTRA_CLEAN = $(pg_regress_clean_files) ./regression_output ./isolation_output
10+
EXTRA_INSTALL=contrib/pg_stat_wait
11+
12+
ifdef USE_PGXS
13+
PG_CONFIG = pg_config
14+
PGXS := $(shell $(PG_CONFIG) --pgxs)
15+
include $(PGXS)
16+
else
17+
subdir = contrib/pg_stat_wait
18+
top_builddir = ../..
19+
include $(top_builddir)/src/Makefile.global
20+
include $(top_srcdir)/contrib/contrib-global.mk
21+
endif
22+
23+
check: regresscheck isolationcheck
24+
25+
submake-regress:
26+
$(MAKE) -C $(top_builddir)/src/test/regress all
27+
28+
submake-isolation:
29+
$(MAKE) -C $(top_builddir)/src/test/isolation all
30+
31+
submake-pg_stat_wait:
32+
$(MAKE) -C $(top_builddir)/contrib/pg_stat_wait
33+
34+
REGRESSCHECKS=file_trace descriptions
35+
36+
regresscheck: all | submake-regress submake-pg_stat_wait
37+
$(MKDIR_P) regression_output
38+
$(pg_regress_check) \
39+
--temp-instance=./tmp_check \
40+
--load-extension=pg_stat_wait \
41+
--outputdir=./regression_output \
42+
$(REGRESSCHECKS)
43+
44+
ISOLATIONCHECKS=history
45+
46+
isolationcheck: all | submake-isolation submake-pg_stat_wait
47+
$(MKDIR_P) isolation_output
48+
$(pg_isolation_regress_check) \
49+
--temp-instance=./tmp_check \
50+
--temp-config=./waits.conf \
51+
--load-extension=pg_stat_wait \
52+
--outputdir=./isolation_output \
53+
$(ISOLATIONCHECKS)
54+

Diff for: contrib/pg_stat_wait/collector.c

+290
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,290 @@
1+
#include "postgres.h"
2+
3+
#include "access/htup_details.h"
4+
#include "catalog/pg_type.h"
5+
#include "funcapi.h"
6+
#include "miscadmin.h"
7+
#include "postmaster/bgworker.h"
8+
#include "storage/ipc.h"
9+
#include "storage/procarray.h"
10+
#include "storage/procsignal.h"
11+
#include "storage/s_lock.h"
12+
#include "storage/shm_mq.h"
13+
#include "storage/shm_toc.h"
14+
#include "storage/spin.h"
15+
#include "storage/wait.h"
16+
#include "utils/guc.h"
17+
#include "utils/memutils.h"
18+
#include "utils/resowner.h"
19+
#include "portability/instr_time.h"
20+
21+
#include "pg_stat_wait.h"
22+
23+
CollectorShmqHeader *hdr;
24+
25+
static void *pgsw;
26+
shm_toc *toc;
27+
shm_mq *mq;
28+
static volatile sig_atomic_t shutdown_requested = false;
29+
30+
static void handle_sigterm(SIGNAL_ARGS);
31+
static void collector_main(Datum main_arg);
32+
33+
/*
34+
* Estimate shared memory space needed.
35+
*/
36+
Size
37+
CollectorShmemSize(void)
38+
{
39+
shm_toc_estimator e;
40+
Size size;
41+
42+
shm_toc_initialize_estimator(&e);
43+
shm_toc_estimate_chunk(&e, sizeof(CollectorShmqHeader));
44+
shm_toc_estimate_chunk(&e, (Size) COLLECTOR_QUEUE_SIZE);
45+
shm_toc_estimate_keys(&e, 2);
46+
size = shm_toc_estimate(&e);
47+
48+
return size;
49+
}
50+
51+
void
52+
AllocateCollectorMem(void)
53+
{
54+
bool found;
55+
Size segsize= CollectorShmemSize();
56+
57+
pgsw = ShmemInitStruct("pg_stat_wait", segsize, &found);
58+
59+
if (!found)
60+
{
61+
void *mq_mem;
62+
63+
toc = shm_toc_create(PG_STAT_WAIT_MAGIC, pgsw, segsize);
64+
hdr = shm_toc_allocate(toc, sizeof(CollectorShmqHeader));
65+
shm_toc_insert(toc, 0, hdr);
66+
67+
mq_mem = shm_toc_allocate(toc, COLLECTOR_QUEUE_SIZE);
68+
shm_toc_insert(toc, 1, mq_mem);
69+
70+
DefineCustomIntVariable("pg_stat_wait.history_size",
71+
"Sets size of waits history.", NULL,
72+
&hdr->historySize, 5000, 100, INT_MAX,
73+
PGC_SUSET, 0, NULL, NULL, NULL);
74+
75+
DefineCustomIntVariable("pg_stat_wait.history_period",
76+
"Sets period of waits history sampling.", NULL,
77+
&hdr->historyPeriod, 10, 1, INT_MAX,
78+
PGC_SUSET, 0, NULL, NULL, NULL);
79+
80+
DefineCustomBoolVariable("pg_stat_wait.history_skip_latch",
81+
"Skip latch events in waits history", NULL,
82+
&hdr->historySkipLatch, false, PGC_SUSET, 0, NULL, NULL, NULL);
83+
}
84+
else
85+
{
86+
toc = shm_toc_attach(PG_STAT_WAIT_MAGIC, pgsw);
87+
hdr = shm_toc_lookup(toc, 0);
88+
}
89+
}
90+
91+
void
92+
RegisterWaitsCollector(void)
93+
{
94+
BackgroundWorker worker;
95+
96+
/* set up common data for all our workers */
97+
worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
98+
BGWORKER_BACKEND_DATABASE_CONNECTION;
99+
worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
100+
worker.bgw_restart_time = BGW_NEVER_RESTART;
101+
worker.bgw_main = collector_main;
102+
worker.bgw_notify_pid = 0;
103+
snprintf(worker.bgw_name, BGW_MAXLEN, "pg_stat_wait collector");
104+
worker.bgw_main_arg = (Datum)0;
105+
RegisterBackgroundWorker(&worker);
106+
}
107+
108+
void
109+
AllocHistory(History *observations, int count)
110+
{
111+
observations->items = (HistoryItem *) palloc0(sizeof(HistoryItem) * count);
112+
observations->index = 0;
113+
observations->count = count;
114+
observations->wraparound = false;
115+
}
116+
117+
/* Read current wait information from proc, if readCurrent is true,
118+
* then it reads from currently going wait, and can be inconsistent
119+
*/
120+
int
121+
GetCurrentWaitsState(PGPROC *proc, HistoryItem *item, int idx)
122+
{
123+
instr_time currentTime;
124+
ProcWait *wait;
125+
126+
if (idx == -1)
127+
return 0;
128+
129+
INSTR_TIME_SET_CURRENT(currentTime);
130+
wait = &proc->waits.waitsBuf[idx];
131+
item->backendPid = proc->pid;
132+
item->classId = (int)wait->classId;
133+
if (item->classId == 0)
134+
return 0;
135+
136+
item->eventId = (int)wait->eventId;
137+
138+
INSTR_TIME_SUBTRACT(currentTime, wait->startTime);
139+
item->waitTime = INSTR_TIME_GET_MICROSEC(currentTime);
140+
memcpy(item->params, wait->params, sizeof(item->params));
141+
return 1;
142+
}
143+
144+
static void
145+
handle_sigterm(SIGNAL_ARGS)
146+
{
147+
int save_errno = errno;
148+
shutdown_requested = true;
149+
if (MyProc)
150+
SetLatch(&MyProc->procLatch);
151+
errno = save_errno;
152+
}
153+
154+
/* Circulation in history */
155+
static HistoryItem *
156+
get_next_observation(History *observations)
157+
{
158+
HistoryItem *result;
159+
160+
result = &observations->items[observations->index];
161+
observations->index++;
162+
if (observations->index >= observations->count)
163+
{
164+
observations->index = 0;
165+
observations->wraparound = true;
166+
}
167+
return result;
168+
}
169+
170+
/* Gets current waits from backends */
171+
static void
172+
write_waits_history(History *observations, TimestampTz current_ts)
173+
{
174+
int i;
175+
176+
LWLockAcquire(ProcArrayLock, LW_SHARED);
177+
for (i = 0; i < ProcGlobal->allProcCount; ++i)
178+
{
179+
HistoryItem item, *observation;
180+
PGPROC *proc = &ProcGlobal->allProcs[i];
181+
int stateOk = GetCurrentWaitsState(proc, &item, proc->waits.readIdx);
182+
183+
/* mark waits as read */
184+
proc->waits.readIdx = -1;
185+
186+
if (stateOk)
187+
{
188+
if (hdr->historySkipLatch && item.classId == WAIT_LATCH)
189+
continue;
190+
191+
item.ts = current_ts;
192+
observation = get_next_observation(observations);
193+
*observation = item;
194+
}
195+
}
196+
LWLockRelease(ProcArrayLock);
197+
}
198+
199+
static void
200+
send_history(History *observations, shm_mq_handle *mqh)
201+
{
202+
int count, i;
203+
204+
if (observations->wraparound)
205+
count = observations->count;
206+
else
207+
count = observations->index;
208+
209+
shm_mq_send(mqh, sizeof(count), &count, false);
210+
for (i = 0; i < count; i++)
211+
shm_mq_send(mqh, sizeof(HistoryItem), &observations->items[i], false);
212+
}
213+
214+
static void
215+
collector_main(Datum main_arg)
216+
{
217+
shm_mq *mq;
218+
shm_mq_handle *mqh;
219+
History observations;
220+
MemoryContext old_context, collector_context;
221+
222+
/*
223+
* Establish signal handlers.
224+
*
225+
* We want CHECK_FOR_INTERRUPTS() to kill off this worker process just as
226+
* it would a normal user backend. To make that happen, we establish a
227+
* signal handler that is a stripped-down version of die(). We don't have
228+
* any equivalent of the backend's command-read loop, where interrupts can
229+
* be processed immediately, so make sure ImmediateInterruptOK is turned
230+
* off.
231+
*/
232+
pqsignal(SIGTERM, handle_sigterm);
233+
BackgroundWorkerUnblockSignals();
234+
235+
hdr->latch = &MyProc->procLatch;
236+
CurrentResourceOwner = ResourceOwnerCreate(NULL, "pg_stat_wait collector");
237+
collector_context = AllocSetContextCreate(TopMemoryContext,
238+
"pg_stat_wait context",
239+
ALLOCSET_DEFAULT_MINSIZE,
240+
ALLOCSET_DEFAULT_INITSIZE,
241+
ALLOCSET_DEFAULT_MAXSIZE);
242+
old_context = MemoryContextSwitchTo(collector_context);
243+
AllocHistory(&observations, hdr->historySize);
244+
MemoryContextSwitchTo(old_context);
245+
246+
while (1)
247+
{
248+
int rc;
249+
TimestampTz current_ts;
250+
251+
ResetLatch(&MyProc->procLatch);
252+
current_ts = GetCurrentTimestamp();
253+
write_waits_history(&observations, current_ts);
254+
255+
if (shutdown_requested)
256+
break;
257+
258+
rc = WaitLatch(&MyProc->procLatch,
259+
WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
260+
hdr->historyPeriod);
261+
262+
if (rc & WL_POSTMASTER_DEATH)
263+
exit(1);
264+
265+
if (hdr->request == HISTORY_REQUEST)
266+
{
267+
hdr->request = NO_REQUEST;
268+
269+
mq = (shm_mq *)shm_toc_lookup(toc, 1);
270+
shm_mq_set_sender(mq, MyProc);
271+
mqh = shm_mq_attach(mq, NULL, NULL);
272+
shm_mq_wait_for_attach(mqh);
273+
274+
if (shm_mq_get_receiver(mq) != NULL)
275+
send_history(&observations, mqh);
276+
277+
shm_mq_detach(mq);
278+
}
279+
}
280+
281+
MemoryContextReset(collector_context);
282+
283+
/*
284+
* We're done. Explicitly detach the shared memory segment so that we
285+
* don't get a resource leak warning at commit time. This will fire any
286+
* on_dsm_detach callbacks we've registered, as well. Once that's done,
287+
* we can go ahead and exit.
288+
*/
289+
proc_exit(0);
290+
}

0 commit comments

Comments
 (0)