2
2
* collector.c
3
3
* Collector of wait event history and profile.
4
4
*
5
- * Copyright (c) 2015-2016 , Postgres Professional
5
+ * Copyright (c) 2015-2025 , Postgres Professional
6
6
*
7
7
* IDENTIFICATION
8
8
* contrib/pg_wait_sampling/pg_wait_sampling.c
9
9
*/
10
10
#include "postgres.h"
11
11
12
- #include "catalog/pg_type.h"
13
- #if PG_VERSION_NUM >= 130000
14
- #include "common/hashfn.h"
15
- #endif
16
- #include "funcapi.h"
12
+ #include <signal.h>
13
+
14
+ #include "compat.h"
17
15
#include "miscadmin.h"
16
+ #include "pg_wait_sampling.h"
17
+ #include "pgstat.h"
18
18
#include "postmaster/bgworker.h"
19
19
#include "postmaster/interrupt.h"
20
20
#include "storage/ipc.h"
21
- #include "storage/procarray.h"
21
+ #include "storage/latch.h"
22
+ #include "storage/lock.h"
23
+ #include "storage/lwlock.h"
24
+ #include "storage/proc.h"
22
25
#include "storage/procsignal.h"
23
26
#include "storage/shm_mq.h"
24
- #include "storage/shm_toc .h"
25
- #include "storage/spin .h"
27
+ #include "utils/guc .h"
28
+ #include "utils/hsearch .h"
26
29
#include "utils/memutils.h"
27
30
#include "utils/resowner.h"
28
- #include "pgstat.h"
29
-
30
- #include "compat.h"
31
- #include "pg_wait_sampling.h"
31
+ #include "utils/timestamp.h"
32
32
33
33
static volatile sig_atomic_t shutdown_requested = false;
34
34
@@ -73,10 +73,10 @@ alloc_history(History *observations, int count)
73
73
static void
74
74
realloc_history (History * observations , int count )
75
75
{
76
- HistoryItem * newitems ;
77
- int copyCount ,
78
- i ,
79
- j ;
76
+ HistoryItem * newitems ;
77
+ int copyCount ,
78
+ i ,
79
+ j ;
80
80
81
81
/* Allocate new array for history */
82
82
newitems = (HistoryItem * ) palloc0 (sizeof (HistoryItem ) * count );
@@ -114,7 +114,8 @@ realloc_history(History *observations, int count)
114
114
static void
115
115
handle_sigterm (SIGNAL_ARGS )
116
116
{
117
- int save_errno = errno ;
117
+ int save_errno = errno ;
118
+
118
119
shutdown_requested = true;
119
120
if (MyProc )
120
121
SetLatch (& MyProc -> procLatch );
@@ -129,6 +130,7 @@ get_next_observation(History *observations)
129
130
{
130
131
HistoryItem * result ;
131
132
133
+ /* Check for wraparound */
132
134
if (observations -> index >= observations -> count )
133
135
{
134
136
observations -> index = 0 ;
@@ -149,7 +151,7 @@ probe_waits(History *observations, HTAB *profile_hash,
149
151
{
150
152
int i ,
151
153
newSize ;
152
- TimestampTz ts = GetCurrentTimestamp ();
154
+ TimestampTz ts = GetCurrentTimestamp ();
153
155
154
156
/* Realloc waits history if needed */
155
157
newSize = pgws_historySize ;
@@ -160,9 +162,9 @@ probe_waits(History *observations, HTAB *profile_hash,
160
162
LWLockAcquire (ProcArrayLock , LW_SHARED );
161
163
for (i = 0 ; i < ProcGlobal -> allProcCount ; i ++ )
162
164
{
163
- HistoryItem item ,
164
- * observation ;
165
- PGPROC * proc = & ProcGlobal -> allProcs [i ];
165
+ HistoryItem item ,
166
+ * observation ;
167
+ PGPROC * proc = & ProcGlobal -> allProcs [i ];
166
168
167
169
if (!pgws_should_sample_proc (proc , & item .pid , & item .wait_event_info ))
168
170
continue ;
@@ -184,8 +186,8 @@ probe_waits(History *observations, HTAB *profile_hash,
184
186
/* Write to the profile if needed */
185
187
if (write_profile )
186
188
{
187
- ProfileItem * profileItem ;
188
- bool found ;
189
+ ProfileItem * profileItem ;
190
+ bool found ;
189
191
190
192
if (!profile_pid )
191
193
item .pid = 0 ;
@@ -206,15 +208,16 @@ probe_waits(History *observations, HTAB *profile_hash,
206
208
static void
207
209
send_history (History * observations , shm_mq_handle * mqh )
208
210
{
209
- Size count ,
210
- i ;
211
- shm_mq_result mq_result ;
211
+ Size count ,
212
+ i ;
213
+ shm_mq_result mq_result ;
212
214
213
215
if (observations -> wraparound )
214
216
count = observations -> count ;
215
217
else
216
218
count = observations -> index ;
217
219
220
+ /* Send array size first since receive_array expects this */
218
221
mq_result = shm_mq_send_compat (mqh , sizeof (count ), & count , false, true);
219
222
if (mq_result == SHM_MQ_DETACHED )
220
223
{
@@ -226,10 +229,10 @@ send_history(History *observations, shm_mq_handle *mqh)
226
229
for (i = 0 ; i < count ; i ++ )
227
230
{
228
231
mq_result = shm_mq_send_compat (mqh ,
229
- sizeof (HistoryItem ),
230
- & observations -> items [i ],
231
- false,
232
- true);
232
+ sizeof (HistoryItem ),
233
+ & observations -> items [i ],
234
+ false,
235
+ true);
233
236
if (mq_result == SHM_MQ_DETACHED )
234
237
{
235
238
ereport (WARNING ,
@@ -246,11 +249,12 @@ send_history(History *observations, shm_mq_handle *mqh)
246
249
static void
247
250
send_profile (HTAB * profile_hash , shm_mq_handle * mqh )
248
251
{
249
- HASH_SEQ_STATUS scan_status ;
250
- ProfileItem * item ;
251
- Size count = hash_get_num_entries (profile_hash );
252
- shm_mq_result mq_result ;
252
+ HASH_SEQ_STATUS scan_status ;
253
+ ProfileItem * item ;
254
+ Size count = hash_get_num_entries (profile_hash );
255
+ shm_mq_result mq_result ;
253
256
257
+ /* Send array size first since receive_array expects this */
254
258
mq_result = shm_mq_send_compat (mqh , sizeof (count ), & count , false, true);
255
259
if (mq_result == SHM_MQ_DETACHED )
256
260
{
@@ -281,10 +285,7 @@ send_profile(HTAB *profile_hash, shm_mq_handle *mqh)
281
285
static HTAB *
282
286
make_profile_hash ()
283
287
{
284
- HASHCTL hash_ctl ;
285
-
286
- hash_ctl .hash = tag_hash ;
287
- hash_ctl .hcxt = TopMemoryContext ;
288
+ HASHCTL hash_ctl ;
288
289
289
290
if (pgws_profileQueries )
290
291
hash_ctl .keysize = offsetof(ProfileItem , count );
@@ -293,7 +294,7 @@ make_profile_hash()
293
294
294
295
hash_ctl .entrysize = sizeof (ProfileItem );
295
296
return hash_create ("Waits profile hash" , 1024 , & hash_ctl ,
296
- HASH_FUNCTION | HASH_ELEM );
297
+ HASH_ELEM | HASH_BLOBS );
297
298
}
298
299
299
300
/*
@@ -302,8 +303,8 @@ make_profile_hash()
302
303
static int64
303
304
millisecs_diff (TimestampTz tz1 , TimestampTz tz2 )
304
305
{
305
- long secs ;
306
- int microsecs ;
306
+ long secs ;
307
+ int microsecs ;
307
308
308
309
TimestampDifference (tz1 , tz2 , & secs , & microsecs );
309
310
@@ -317,26 +318,19 @@ millisecs_diff(TimestampTz tz1, TimestampTz tz2)
317
318
void
318
319
pgws_collector_main (Datum main_arg )
319
320
{
320
- HTAB * profile_hash = NULL ;
321
- History observations ;
322
- MemoryContext old_context ,
323
- collector_context ;
324
- TimestampTz current_ts ,
325
- history_ts ,
326
- profile_ts ;
321
+ HTAB * profile_hash = NULL ;
322
+ History observations ;
323
+ MemoryContext old_context ,
324
+ collector_context ;
325
+ TimestampTz current_ts ,
326
+ history_ts ,
327
+ profile_ts ;
327
328
328
329
/*
329
330
* Establish signal handlers.
330
331
*
331
- * We want CHECK_FOR_INTERRUPTS() to kill off this worker process just as
332
- * it would a normal user backend. To make that happen, we establish a
333
- * signal handler that is a stripped-down version of die(). We don't have
334
- * any equivalent of the backend's command-read loop, where interrupts can
335
- * be processed immediately, so make sure ImmediateInterruptOK is turned
336
- * off.
337
- *
338
- * We also want to respond to the ProcSignal notifications. This is done
339
- * in the upstream provided procsignal_sigusr1_handler, which is
332
+ * We want to respond to the ProcSignal notifications. This is done in
333
+ * the upstream provided procsignal_sigusr1_handler, which is
340
334
* automatically used if a bgworker connects to a database. But since our
341
335
* worker doesn't connect to any database even though it calls
342
336
* InitPostgres, which will still initializze a new backend and thus
@@ -357,7 +351,7 @@ pgws_collector_main(Datum main_arg)
357
351
358
352
CurrentResourceOwner = ResourceOwnerCreate (NULL , "pg_wait_sampling collector" );
359
353
collector_context = AllocSetContextCreate (TopMemoryContext ,
360
- "pg_wait_sampling context" , ALLOCSET_DEFAULT_SIZES );
354
+ "pg_wait_sampling context" , ALLOCSET_DEFAULT_SIZES );
361
355
old_context = MemoryContextSwitchTo (collector_context );
362
356
alloc_history (& observations , pgws_historySize );
363
357
MemoryContextSwitchTo (old_context );
@@ -369,12 +363,12 @@ pgws_collector_main(Datum main_arg)
369
363
370
364
while (1 )
371
365
{
372
- int rc ;
373
- shm_mq_handle * mqh ;
374
- int64 history_diff ,
375
- profile_diff ;
376
- bool write_history ,
377
- write_profile ;
366
+ int rc ;
367
+ shm_mq_handle * mqh ;
368
+ int64 history_diff ,
369
+ profile_diff ;
370
+ bool write_history ,
371
+ write_profile ;
378
372
379
373
/* We need an explicit call for at least ProcSignal notifications. */
380
374
CHECK_FOR_INTERRUPTS ();
@@ -385,14 +379,14 @@ pgws_collector_main(Datum main_arg)
385
379
ProcessConfigFile (PGC_SIGHUP );
386
380
}
387
381
388
- /* Wait calculate time to next sample for history or profile */
382
+ /* Calculate time to next sample for history or profile */
389
383
current_ts = GetCurrentTimestamp ();
390
384
391
385
history_diff = millisecs_diff (history_ts , current_ts );
392
386
profile_diff = millisecs_diff (profile_ts , current_ts );
393
387
394
- write_history = (history_diff >= (int64 )pgws_historyPeriod );
395
- write_profile = (profile_diff >= (int64 )pgws_profilePeriod );
388
+ write_history = (history_diff >= (int64 ) pgws_historyPeriod );
389
+ write_profile = (profile_diff >= (int64 ) pgws_profilePeriod );
396
390
397
391
if (write_history || write_profile )
398
392
{
@@ -421,8 +415,8 @@ pgws_collector_main(Datum main_arg)
421
415
* shared memory.
422
416
*/
423
417
rc = WaitLatch (& MyProc -> procLatch , WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH ,
424
- Min (pgws_historyPeriod - (int )history_diff ,
425
- pgws_historyPeriod - (int )profile_diff ), PG_WAIT_EXTENSION );
418
+ Min (pgws_historyPeriod - (int ) history_diff ,
419
+ pgws_historyPeriod - (int ) profile_diff ), PG_WAIT_EXTENSION );
426
420
427
421
if (rc & WL_POSTMASTER_DEATH )
428
422
proc_exit (1 );
@@ -443,7 +437,7 @@ pgws_collector_main(Datum main_arg)
443
437
444
438
if (request == HISTORY_REQUEST || request == PROFILE_REQUEST )
445
439
{
446
- shm_mq_result mq_result ;
440
+ shm_mq_result mq_result ;
447
441
448
442
/* Send history or profile */
449
443
shm_mq_set_sender (pgws_collector_mq , MyProc );
@@ -487,12 +481,6 @@ pgws_collector_main(Datum main_arg)
487
481
488
482
MemoryContextReset (collector_context );
489
483
490
- /*
491
- * We're done. Explicitly detach the shared memory segment so that we
492
- * don't get a resource leak warning at commit time. This will fire any
493
- * on_dsm_detach callbacks we've registered, as well. Once that's done,
494
- * we can go ahead and exit.
495
- */
496
484
ereport (LOG , (errmsg ("pg_wait_sampling collector shutting down" )));
497
485
proc_exit (0 );
498
486
}
0 commit comments