diff --git a/collector.c b/collector.c index 4e9d882..f3f141c 100644 --- a/collector.c +++ b/collector.c @@ -225,7 +225,7 @@ send_history(History *observations, shm_mq_handle *mqh) { ereport(WARNING, (errmsg("pg_wait_sampling collector: " - "receiver of message queue have been detached"))); + "receiver of message queue has been detached"))); return; } for (i = 0; i < count; i++) @@ -238,7 +238,7 @@ send_history(History *observations, shm_mq_handle *mqh) { ereport(WARNING, (errmsg("pg_wait_sampling collector: " - "receiver of message queue have been detached"))); + "receiver of message queue has been detached"))); return; } } @@ -260,7 +260,7 @@ send_profile(HTAB *profile_hash, shm_mq_handle *mqh) { ereport(WARNING, (errmsg("pg_wait_sampling collector: " - "receiver of message queue have been detached"))); + "receiver of message queue has been detached"))); return; } hash_seq_init(&scan_status, profile_hash); @@ -272,7 +272,7 @@ send_profile(HTAB *profile_hash, shm_mq_handle *mqh) hash_seq_term(&scan_status); ereport(WARNING, (errmsg("pg_wait_sampling collector: " - "receiver of message queue have been detached"))); + "receiver of message queue has been detached"))); return; } } @@ -468,7 +468,7 @@ collector_main(Datum main_arg) case SHM_MQ_DETACHED: ereport(WARNING, (errmsg("pg_wait_sampling collector: " - "receiver of message queue have been " + "receiver of message queue has been " "detached"))); break; default: diff --git a/pg_wait_sampling.c b/pg_wait_sampling.c index 1bd5c76..dbe2959 100644 --- a/pg_wait_sampling.c +++ b/pg_wait_sampling.c @@ -47,6 +47,11 @@ shm_mq *collector_mq = NULL; uint64 *proc_queryids = NULL; CollectorShmqHeader *collector_hdr = NULL; +/* Receiver (backend) local shm_mq pointers and lock */ +shm_mq *recv_mq = NULL; +shm_mq_handle *recv_mqh = NULL; +LOCKTAG queueTag; + static shmem_startup_hook_type prev_shmem_startup_hook = NULL; static PGPROC * search_proc(int backendPid); static PlannedStmt *pgws_planner_hook(Query *parse, @@ -290,6 +295,14 @@ check_shmem(void) } } +static void +pgws_cleanup_callback(int code, Datum arg) +{ + elog(DEBUG3, "pg_wait_sampling cleanup: detaching shm_mq and releasing queue lock"); + shm_mq_detach_compat(recv_mqh, recv_mq); + LockRelease(&queueTag, ExclusiveLock, false); +} + /* * Module load callback */ @@ -499,16 +512,14 @@ init_lock_tag(LOCKTAG *tag, uint32 lock) static void * receive_array(SHMRequest request, Size item_size, Size *count) { - LOCKTAG queueTag; LOCKTAG collectorTag; - shm_mq *mq; - shm_mq_handle *mqh; shm_mq_result res; Size len, i; void *data; Pointer result, ptr; + MemoryContext oldctx; /* Ensure nobody else trying to send request to queue */ init_lock_tag(&queueTag, PGWS_QUEUE_LOCK); @@ -519,7 +530,7 @@ receive_array(SHMRequest request, Size item_size, Size *count) LockAcquire(&collectorTag, ExclusiveLock, false, false); LockRelease(&collectorTag, ExclusiveLock, false); - mq = shm_mq_create(collector_mq, COLLECTOR_QUEUE_SIZE); + recv_mq = shm_mq_create(collector_mq, COLLECTOR_QUEUE_SIZE); collector_hdr->request = request; if (!collector_hdr->latch) @@ -528,33 +539,55 @@ receive_array(SHMRequest request, Size item_size, Size *count) SetLatch(collector_hdr->latch); - shm_mq_set_receiver(mq, MyProc); - mqh = shm_mq_attach(mq, NULL, NULL); + shm_mq_set_receiver(recv_mq, MyProc); - res = shm_mq_receive(mqh, &len, &data, false); - if (res != SHM_MQ_SUCCESS || len != sizeof(*count)) - { - shm_mq_detach_compat(mqh, mq); - elog(ERROR, "Error reading mq."); - } - memcpy(count, data, sizeof(*count)); - - result = palloc(item_size * (*count)); - ptr = result; + /* + * We switch to TopMemoryContext, so that recv_mqh is allocated there + * and is guaranteed to survive until before_shmem_exit callbacks are + * fired. Anyway, shm_mq_detach() will free handler on its own. + */ + oldctx = MemoryContextSwitchTo(TopMemoryContext); + recv_mqh = shm_mq_attach(recv_mq, NULL, NULL); + MemoryContextSwitchTo(oldctx); - for (i = 0; i < *count; i++) + /* + * Now we surely attached to the shm_mq and got collector's attention. + * If anything went wrong (e.g. Ctrl+C received from the client) we have + * to cleanup some things, i.e. detach from the shm_mq, so collector was + * able to continue responding to other requests. + * + * PG_ENSURE_ERROR_CLEANUP() guaranties that cleanup callback will be + * fired for both ERROR and FATAL. + */ + PG_ENSURE_ERROR_CLEANUP(pgws_cleanup_callback, 0); { - res = shm_mq_receive(mqh, &len, &data, false); - if (res != SHM_MQ_SUCCESS || len != item_size) + res = shm_mq_receive(recv_mqh, &len, &data, false); + if (res != SHM_MQ_SUCCESS || len != sizeof(*count)) { - shm_mq_detach_compat(mqh, mq); + shm_mq_detach_compat(recv_mqh, recv_mq); elog(ERROR, "Error reading mq."); } - memcpy(ptr, data, item_size); - ptr += item_size; + memcpy(count, data, sizeof(*count)); + + result = palloc(item_size * (*count)); + ptr = result; + + for (i = 0; i < *count; i++) + { + res = shm_mq_receive(recv_mqh, &len, &data, false); + if (res != SHM_MQ_SUCCESS || len != item_size) + { + shm_mq_detach_compat(recv_mqh, recv_mq); + elog(ERROR, "Error reading mq."); + } + memcpy(ptr, data, item_size); + ptr += item_size; + } } + PG_END_ENSURE_ERROR_CLEANUP(pgws_cleanup_callback, 0); - shm_mq_detach_compat(mqh, mq); + /* We still have to detach and release lock during normal operation. */ + shm_mq_detach_compat(recv_mqh, recv_mq); LockRelease(&queueTag, ExclusiveLock, false);