Skip to content

Commit 4f65ca6

Browse files
authored
Merge pull request #31 from postgrespro/issue#29
Resolve Issue#29
2 parents 38e5d3b + a0b05e4 commit 4f65ca6

File tree

2 files changed

+61
-28
lines changed

2 files changed

+61
-28
lines changed

Diff for: collector.c

+5-5
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ send_history(History *observations, shm_mq_handle *mqh)
225225
{
226226
ereport(WARNING,
227227
(errmsg("pg_wait_sampling collector: "
228-
"receiver of message queue have been detached")));
228+
"receiver of message queue has been detached")));
229229
return;
230230
}
231231
for (i = 0; i < count; i++)
@@ -238,7 +238,7 @@ send_history(History *observations, shm_mq_handle *mqh)
238238
{
239239
ereport(WARNING,
240240
(errmsg("pg_wait_sampling collector: "
241-
"receiver of message queue have been detached")));
241+
"receiver of message queue has been detached")));
242242
return;
243243
}
244244
}
@@ -260,7 +260,7 @@ send_profile(HTAB *profile_hash, shm_mq_handle *mqh)
260260
{
261261
ereport(WARNING,
262262
(errmsg("pg_wait_sampling collector: "
263-
"receiver of message queue have been detached")));
263+
"receiver of message queue has been detached")));
264264
return;
265265
}
266266
hash_seq_init(&scan_status, profile_hash);
@@ -272,7 +272,7 @@ send_profile(HTAB *profile_hash, shm_mq_handle *mqh)
272272
hash_seq_term(&scan_status);
273273
ereport(WARNING,
274274
(errmsg("pg_wait_sampling collector: "
275-
"receiver of message queue have been detached")));
275+
"receiver of message queue has been detached")));
276276
return;
277277
}
278278
}
@@ -468,7 +468,7 @@ collector_main(Datum main_arg)
468468
case SHM_MQ_DETACHED:
469469
ereport(WARNING,
470470
(errmsg("pg_wait_sampling collector: "
471-
"receiver of message queue have been "
471+
"receiver of message queue has been "
472472
"detached")));
473473
break;
474474
default:

Diff for: pg_wait_sampling.c

+56-23
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,11 @@ shm_mq *collector_mq = NULL;
4747
uint64 *proc_queryids = NULL;
4848
CollectorShmqHeader *collector_hdr = NULL;
4949

50+
/* Receiver (backend) local shm_mq pointers and lock */
51+
shm_mq *recv_mq = NULL;
52+
shm_mq_handle *recv_mqh = NULL;
53+
LOCKTAG queueTag;
54+
5055
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
5156
static PGPROC * search_proc(int backendPid);
5257
static PlannedStmt *pgws_planner_hook(Query *parse,
@@ -290,6 +295,14 @@ check_shmem(void)
290295
}
291296
}
292297

298+
static void
299+
pgws_cleanup_callback(int code, Datum arg)
300+
{
301+
elog(DEBUG3, "pg_wait_sampling cleanup: detaching shm_mq and releasing queue lock");
302+
shm_mq_detach_compat(recv_mqh, recv_mq);
303+
LockRelease(&queueTag, ExclusiveLock, false);
304+
}
305+
293306
/*
294307
* Module load callback
295308
*/
@@ -499,16 +512,14 @@ init_lock_tag(LOCKTAG *tag, uint32 lock)
499512
static void *
500513
receive_array(SHMRequest request, Size item_size, Size *count)
501514
{
502-
LOCKTAG queueTag;
503515
LOCKTAG collectorTag;
504-
shm_mq *mq;
505-
shm_mq_handle *mqh;
506516
shm_mq_result res;
507517
Size len,
508518
i;
509519
void *data;
510520
Pointer result,
511521
ptr;
522+
MemoryContext oldctx;
512523

513524
/* Ensure nobody else trying to send request to queue */
514525
init_lock_tag(&queueTag, PGWS_QUEUE_LOCK);
@@ -519,7 +530,7 @@ receive_array(SHMRequest request, Size item_size, Size *count)
519530
LockAcquire(&collectorTag, ExclusiveLock, false, false);
520531
LockRelease(&collectorTag, ExclusiveLock, false);
521532

522-
mq = shm_mq_create(collector_mq, COLLECTOR_QUEUE_SIZE);
533+
recv_mq = shm_mq_create(collector_mq, COLLECTOR_QUEUE_SIZE);
523534
collector_hdr->request = request;
524535

525536
if (!collector_hdr->latch)
@@ -528,33 +539,55 @@ receive_array(SHMRequest request, Size item_size, Size *count)
528539

529540
SetLatch(collector_hdr->latch);
530541

531-
shm_mq_set_receiver(mq, MyProc);
532-
mqh = shm_mq_attach(mq, NULL, NULL);
542+
shm_mq_set_receiver(recv_mq, MyProc);
533543

534-
res = shm_mq_receive(mqh, &len, &data, false);
535-
if (res != SHM_MQ_SUCCESS || len != sizeof(*count))
536-
{
537-
shm_mq_detach_compat(mqh, mq);
538-
elog(ERROR, "Error reading mq.");
539-
}
540-
memcpy(count, data, sizeof(*count));
541-
542-
result = palloc(item_size * (*count));
543-
ptr = result;
544+
/*
545+
* We switch to TopMemoryContext, so that recv_mqh is allocated there
546+
* and is guaranteed to survive until before_shmem_exit callbacks are
547+
* fired. Anyway, shm_mq_detach() will free handler on its own.
548+
*/
549+
oldctx = MemoryContextSwitchTo(TopMemoryContext);
550+
recv_mqh = shm_mq_attach(recv_mq, NULL, NULL);
551+
MemoryContextSwitchTo(oldctx);
544552

545-
for (i = 0; i < *count; i++)
553+
/*
554+
* Now we surely attached to the shm_mq and got collector's attention.
555+
* If anything went wrong (e.g. Ctrl+C received from the client) we have
556+
* to cleanup some things, i.e. detach from the shm_mq, so collector was
557+
* able to continue responding to other requests.
558+
*
559+
* PG_ENSURE_ERROR_CLEANUP() guaranties that cleanup callback will be
560+
* fired for both ERROR and FATAL.
561+
*/
562+
PG_ENSURE_ERROR_CLEANUP(pgws_cleanup_callback, 0);
546563
{
547-
res = shm_mq_receive(mqh, &len, &data, false);
548-
if (res != SHM_MQ_SUCCESS || len != item_size)
564+
res = shm_mq_receive(recv_mqh, &len, &data, false);
565+
if (res != SHM_MQ_SUCCESS || len != sizeof(*count))
549566
{
550-
shm_mq_detach_compat(mqh, mq);
567+
shm_mq_detach_compat(recv_mqh, recv_mq);
551568
elog(ERROR, "Error reading mq.");
552569
}
553-
memcpy(ptr, data, item_size);
554-
ptr += item_size;
570+
memcpy(count, data, sizeof(*count));
571+
572+
result = palloc(item_size * (*count));
573+
ptr = result;
574+
575+
for (i = 0; i < *count; i++)
576+
{
577+
res = shm_mq_receive(recv_mqh, &len, &data, false);
578+
if (res != SHM_MQ_SUCCESS || len != item_size)
579+
{
580+
shm_mq_detach_compat(recv_mqh, recv_mq);
581+
elog(ERROR, "Error reading mq.");
582+
}
583+
memcpy(ptr, data, item_size);
584+
ptr += item_size;
585+
}
555586
}
587+
PG_END_ENSURE_ERROR_CLEANUP(pgws_cleanup_callback, 0);
556588

557-
shm_mq_detach_compat(mqh, mq);
589+
/* We still have to detach and release lock during normal operation. */
590+
shm_mq_detach_compat(recv_mqh, recv_mq);
558591

559592
LockRelease(&queueTag, ExclusiveLock, false);
560593

0 commit comments

Comments
 (0)