26
26
#include "utils/resowner.h"
27
27
#include "pgstat.h"
28
28
29
+ #include "compat.h"
29
30
#include "pg_wait_sampling.h"
30
31
31
32
static volatile sig_atomic_t shutdown_requested = false;
@@ -36,18 +37,18 @@ static void handle_sigterm(SIGNAL_ARGS);
36
37
* Register background worker for collecting waits history.
37
38
*/
38
39
void
39
- register_wait_collector (void )
40
+ pgws_register_wait_collector (void )
40
41
{
41
42
BackgroundWorker worker ;
42
43
43
44
/* Set up background worker parameters */
44
45
memset (& worker , 0 , sizeof (worker ));
45
46
worker .bgw_flags = BGWORKER_SHMEM_ACCESS ;
46
47
worker .bgw_start_time = BgWorkerStart_ConsistentState ;
47
- worker .bgw_restart_time = 0 ;
48
+ worker .bgw_restart_time = 1 ;
48
49
worker .bgw_notify_pid = 0 ;
49
50
snprintf (worker .bgw_library_name , BGW_MAXLEN , "pg_wait_sampling" );
50
- snprintf (worker .bgw_function_name , BGW_MAXLEN , CppAsString (collector_main ));
51
+ snprintf (worker .bgw_function_name , BGW_MAXLEN , CppAsString (pgws_collector_main ));
51
52
snprintf (worker .bgw_name , BGW_MAXLEN , "pg_wait_sampling collector" );
52
53
worker .bgw_main_arg = (Datum ) 0 ;
53
54
RegisterBackgroundWorker (& worker );
@@ -56,7 +57,7 @@ register_wait_collector(void)
56
57
/*
57
58
* Allocate memory for waits history.
58
59
*/
59
- void
60
+ static void
60
61
alloc_history (History * observations , int count )
61
62
{
62
63
observations -> items = (HistoryItem * ) palloc0 (sizeof (HistoryItem ) * count );
@@ -150,7 +151,7 @@ probe_waits(History *observations, HTAB *profile_hash,
150
151
TimestampTz ts = GetCurrentTimestamp ();
151
152
152
153
/* Realloc waits history if needed */
153
- newSize = collector_hdr -> historySize ;
154
+ newSize = pgws_collector_hdr -> historySize ;
154
155
if (observations -> count != newSize )
155
156
realloc_history (observations , newSize );
156
157
@@ -172,8 +173,8 @@ probe_waits(History *observations, HTAB *profile_hash,
172
173
item .pid = proc -> pid ;
173
174
item .wait_event_info = proc -> wait_event_info ;
174
175
175
- if (collector_hdr -> profileQueries )
176
- item .queryId = proc_queryids [i ];
176
+ if (pgws_collector_hdr -> profileQueries )
177
+ item .queryId = pgws_proc_queryids [i ];
177
178
else
178
179
item .queryId = 0 ;
179
180
@@ -220,7 +221,7 @@ send_history(History *observations, shm_mq_handle *mqh)
220
221
else
221
222
count = observations -> index ;
222
223
223
- mq_result = shm_mq_send (mqh , sizeof (count ), & count , false);
224
+ mq_result = shm_mq_send_compat (mqh , sizeof (count ), & count , false, true );
224
225
if (mq_result == SHM_MQ_DETACHED )
225
226
{
226
227
ereport (WARNING ,
@@ -230,10 +231,11 @@ send_history(History *observations, shm_mq_handle *mqh)
230
231
}
231
232
for (i = 0 ; i < count ; i ++ )
232
233
{
233
- mq_result = shm_mq_send (mqh ,
234
+ mq_result = shm_mq_send_compat (mqh ,
234
235
sizeof (HistoryItem ),
235
236
& observations -> items [i ],
236
- false);
237
+ false,
238
+ true);
237
239
if (mq_result == SHM_MQ_DETACHED )
238
240
{
239
241
ereport (WARNING ,
@@ -255,7 +257,7 @@ send_profile(HTAB *profile_hash, shm_mq_handle *mqh)
255
257
Size count = hash_get_num_entries (profile_hash );
256
258
shm_mq_result mq_result ;
257
259
258
- mq_result = shm_mq_send (mqh , sizeof (count ), & count , false);
260
+ mq_result = shm_mq_send_compat (mqh , sizeof (count ), & count , false, true );
259
261
if (mq_result == SHM_MQ_DETACHED )
260
262
{
261
263
ereport (WARNING ,
@@ -266,7 +268,8 @@ send_profile(HTAB *profile_hash, shm_mq_handle *mqh)
266
268
hash_seq_init (& scan_status , profile_hash );
267
269
while ((item = (ProfileItem * ) hash_seq_search (& scan_status )) != NULL )
268
270
{
269
- mq_result = shm_mq_send (mqh , sizeof (ProfileItem ), item , false);
271
+ mq_result = shm_mq_send_compat (mqh , sizeof (ProfileItem ), item , false,
272
+ true);
270
273
if (mq_result == SHM_MQ_DETACHED )
271
274
{
272
275
hash_seq_term (& scan_status );
@@ -289,7 +292,7 @@ make_profile_hash()
289
292
hash_ctl .hash = tag_hash ;
290
293
hash_ctl .hcxt = TopMemoryContext ;
291
294
292
- if (collector_hdr -> profileQueries )
295
+ if (pgws_collector_hdr -> profileQueries )
293
296
hash_ctl .keysize = offsetof(ProfileItem , count );
294
297
else
295
298
hash_ctl .keysize = offsetof(ProfileItem , queryId );
@@ -318,7 +321,7 @@ millisecs_diff(TimestampTz tz1, TimestampTz tz2)
318
321
* Main routine of wait history collector.
319
322
*/
320
323
void
321
- collector_main (Datum main_arg )
324
+ pgws_collector_main (Datum main_arg )
322
325
{
323
326
HTAB * profile_hash = NULL ;
324
327
History observations ;
@@ -337,28 +340,31 @@ collector_main(Datum main_arg)
337
340
* any equivalent of the backend's command-read loop, where interrupts can
338
341
* be processed immediately, so make sure ImmediateInterruptOK is turned
339
342
* off.
343
+ *
344
+ * We also want to respond to the ProcSignal notifications. This is done
345
+ * in the upstream provided procsignal_sigusr1_handler, which is
346
+ * automatically used if a bgworker connects to a database. But since our
347
+ * worker doesn't connect to any database even though it calls
348
+ * InitPostgres, which will still initializze a new backend and thus
349
+ * partitipate to the ProcSignal infrastructure.
340
350
*/
341
351
pqsignal (SIGTERM , handle_sigterm );
352
+ pqsignal (SIGUSR1 , procsignal_sigusr1_handler );
342
353
BackgroundWorkerUnblockSignals ();
343
-
344
- #if PG_VERSION_NUM >= 110000
345
- InitPostgres (NULL , InvalidOid , NULL , InvalidOid , NULL , false);
346
- #else
347
- InitPostgres (NULL , InvalidOid , NULL , InvalidOid , NULL );
348
- #endif
354
+ InitPostgresCompat (NULL , InvalidOid , NULL , InvalidOid , false, false, NULL );
349
355
SetProcessingMode (NormalProcessing );
350
356
351
357
/* Make pg_wait_sampling recognisable in pg_stat_activity */
352
358
pgstat_report_appname ("pg_wait_sampling collector" );
353
359
354
360
profile_hash = make_profile_hash ();
355
- collector_hdr -> latch = & MyProc -> procLatch ;
361
+ pgws_collector_hdr -> latch = & MyProc -> procLatch ;
356
362
357
363
CurrentResourceOwner = ResourceOwnerCreate (NULL , "pg_wait_sampling collector" );
358
364
collector_context = AllocSetContextCreate (TopMemoryContext ,
359
365
"pg_wait_sampling context" , ALLOCSET_DEFAULT_SIZES );
360
366
old_context = MemoryContextSwitchTo (collector_context );
361
- alloc_history (& observations , collector_hdr -> historySize );
367
+ alloc_history (& observations , pgws_collector_hdr -> historySize );
362
368
MemoryContextSwitchTo (old_context );
363
369
364
370
ereport (LOG , (errmsg ("pg_wait_sampling collector started" )));
@@ -377,21 +383,24 @@ collector_main(Datum main_arg)
377
383
bool write_history ,
378
384
write_profile ;
379
385
386
+ /* We need an explicit call for at least ProcSignal notifications. */
387
+ CHECK_FOR_INTERRUPTS ();
388
+
380
389
/* Wait calculate time to next sample for history or profile */
381
390
current_ts = GetCurrentTimestamp ();
382
391
383
392
history_diff = millisecs_diff (history_ts , current_ts );
384
393
profile_diff = millisecs_diff (profile_ts , current_ts );
385
- history_period = collector_hdr -> historyPeriod ;
386
- profile_period = collector_hdr -> profilePeriod ;
394
+ history_period = pgws_collector_hdr -> historyPeriod ;
395
+ profile_period = pgws_collector_hdr -> profilePeriod ;
387
396
388
397
write_history = (history_diff >= (int64 )history_period );
389
398
write_profile = (profile_diff >= (int64 )profile_period );
390
399
391
400
if (write_history || write_profile )
392
401
{
393
402
probe_waits (& observations , profile_hash ,
394
- write_history , write_profile , collector_hdr -> profilePid );
403
+ write_history , write_profile , pgws_collector_hdr -> profilePid );
395
404
396
405
if (write_history )
397
406
{
@@ -430,67 +439,58 @@ collector_main(Datum main_arg)
430
439
ResetLatch (& MyProc -> procLatch );
431
440
432
441
/* Handle request if any */
433
- if (collector_hdr -> request != NO_REQUEST )
442
+ if (pgws_collector_hdr -> request != NO_REQUEST )
434
443
{
435
444
LOCKTAG tag ;
436
- SHMRequest request = collector_hdr -> request ;
445
+ SHMRequest request ;
437
446
438
- init_lock_tag (& tag , PGWS_COLLECTOR_LOCK );
447
+ pgws_init_lock_tag (& tag , PGWS_COLLECTOR_LOCK );
439
448
440
449
LockAcquire (& tag , ExclusiveLock , false, false);
441
- collector_hdr -> request = NO_REQUEST ;
450
+ request = pgws_collector_hdr -> request ;
451
+ pgws_collector_hdr -> request = NO_REQUEST ;
442
452
443
- PG_TRY ();
453
+ if ( request == HISTORY_REQUEST || request == PROFILE_REQUEST )
444
454
{
445
- if (request == HISTORY_REQUEST || request == PROFILE_REQUEST )
446
- {
447
- shm_mq_result mq_result ;
448
-
449
- /* Send history or profile */
450
- shm_mq_set_sender (collector_mq , MyProc );
451
- mqh = shm_mq_attach (collector_mq , NULL , NULL );
452
- mq_result = shm_mq_wait_for_attach (mqh );
453
- switch (mq_result )
454
- {
455
- case SHM_MQ_SUCCESS :
456
- switch (request )
457
- {
458
- case HISTORY_REQUEST :
459
- send_history (& observations , mqh );
460
- break ;
461
- case PROFILE_REQUEST :
462
- send_profile (profile_hash , mqh );
463
- break ;
464
- default :
465
- AssertState (false);
466
- }
467
- break ;
468
- case SHM_MQ_DETACHED :
469
- ereport (WARNING ,
470
- (errmsg ("pg_wait_sampling collector: "
471
- "receiver of message queue has been "
472
- "detached" )));
473
- break ;
474
- default :
475
- AssertState (false);
476
- }
477
- shm_mq_detach_compat (mqh , collector_mq );
478
- }
479
- else if (request == PROFILE_RESET )
455
+ shm_mq_result mq_result ;
456
+
457
+ /* Send history or profile */
458
+ shm_mq_set_sender (pgws_collector_mq , MyProc );
459
+ mqh = shm_mq_attach (pgws_collector_mq , NULL , NULL );
460
+ mq_result = shm_mq_wait_for_attach (mqh );
461
+ switch (mq_result )
480
462
{
481
- /* Reset profile hash */
482
- hash_destroy (profile_hash );
483
- profile_hash = make_profile_hash ();
463
+ case SHM_MQ_SUCCESS :
464
+ switch (request )
465
+ {
466
+ case HISTORY_REQUEST :
467
+ send_history (& observations , mqh );
468
+ break ;
469
+ case PROFILE_REQUEST :
470
+ send_profile (profile_hash , mqh );
471
+ break ;
472
+ default :
473
+ AssertState (false);
474
+ }
475
+ break ;
476
+ case SHM_MQ_DETACHED :
477
+ ereport (WARNING ,
478
+ (errmsg ("pg_wait_sampling collector: "
479
+ "receiver of message queue have been "
480
+ "detached" )));
481
+ break ;
482
+ default :
483
+ AssertState (false);
484
484
}
485
-
486
- LockRelease (& tag , ExclusiveLock , false);
485
+ shm_mq_detach_compat (mqh , pgws_collector_mq );
487
486
}
488
- PG_CATCH ();
487
+ else if ( request == PROFILE_RESET )
489
488
{
490
- LockRelease (& tag , ExclusiveLock , false);
491
- PG_RE_THROW ();
489
+ /* Reset profile hash */
490
+ hash_destroy (profile_hash );
491
+ profile_hash = make_profile_hash ();
492
492
}
493
- PG_END_TRY ( );
493
+ LockRelease ( & tag , ExclusiveLock , false );
494
494
}
495
495
}
496
496
0 commit comments