From a6b31ea68a894cd2d8efc8740297350fceded586 Mon Sep 17 00:00:00 2001 From: Maksim Melnikov Date: Fri, 20 Feb 2026 12:13:48 +0300 Subject: [PATCH 1/2] Fix races between pg_query_state and pg_progress_bar. pg_progress_bar should be locked with PG_QS_RCV_KEY, because it is using the same shmem as pg_query_state. --- pg_query_state.c | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/pg_query_state.c b/pg_query_state.c index 739a44e..cbade05 100644 --- a/pg_query_state.c +++ b/pg_query_state.c @@ -132,6 +132,7 @@ pg_qs_shmem_startup(void) void *shmem; int num_toc = 0; + LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); shmem = ShmemInitStruct("pg_query_state", shmem_size, &found); if (!found) { @@ -162,6 +163,7 @@ pg_qs_shmem_startup(void) mq = shm_toc_lookup(toc, num_toc++, false); #endif } + LWLockRelease(AddinShmemInitLock); if (prev_shmem_startup_hook) prev_shmem_startup_hook(); @@ -1434,6 +1436,7 @@ pg_progress_bar(PG_FUNCTION_ARGS) List *msgs; double progress; double old_progress; + LOCKTAG tag; if (PG_NARGS() == 2) { @@ -1467,6 +1470,7 @@ pg_progress_bar(PG_FUNCTION_ARGS) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("backend with pid=%d not found", pid))); + LockShmem(&tag, PG_QS_RCV_KEY); counterpart_user_id = GetRemoteBackendUserId(proc); if (!(superuser() || GetUserId() == counterpart_user_id)) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), @@ -1493,16 +1497,19 @@ pg_progress_bar(PG_FUNCTION_ARGS) { case QUERY_NOT_RUNNING: elog(INFO, "query not runing"); + UnlockShmem(&tag); PG_RETURN_FLOAT8((float8) -1); break; case STAT_DISABLED: elog(INFO, "query execution statistics disabled"); + UnlockShmem(&tag); PG_RETURN_FLOAT8((float8) -1); default: break; } if (msg->result_code == QS_RETURNED && delay == 0) { + UnlockShmem(&tag); progress = GetCurrentNumericState(msg); if (progress < 0) { @@ -1545,7 +1552,9 @@ pg_progress_bar(PG_FUNCTION_ARGS) } if (progress > -1) elog(INFO, "\rProgress = 1.000000"); + UnlockShmem(&tag); PG_RETURN_FLOAT8((float8) 1); } + UnlockShmem(&tag); PG_RETURN_FLOAT8((float8) -1); } From d6427057a0a9be600f03f269a35fb9c8b7f31f14 Mon Sep 17 00:00:00 2001 From: Maksim Melnikov Date: Thu, 26 Feb 2026 14:43:40 +0300 Subject: [PATCH 2/2] Fix inter-process communication issues. 1. Fix reqid logic, now it controls backend interruptions. It is used to control that currently processed request has the same reqid as the one, polled from shmem queue. 2. Now all inter-process communication in based on shmem queue reads/sends with appropriate timeouts. --- pg_query_state.c | 296 +++++++++++++++++++++++------------------------ pg_query_state.h | 42 +++++-- signal_handler.c | 91 ++++++--------- 3 files changed, 214 insertions(+), 215 deletions(-) diff --git a/pg_query_state.c b/pg_query_state.c index cbade05..d3056ff 100644 --- a/pg_query_state.c +++ b/pg_query_state.c @@ -64,21 +64,12 @@ static shm_mq_result receive_msg_by_parts(shm_mq_handle *mqh, Size *total, /* Global variables */ List *QueryDescStack = NIL; -static ProcSignalReason UserIdPollReason = INVALID_PROCSIGNAL; -static ProcSignalReason QueryStatePollReason = INVALID_PROCSIGNAL; -static ProcSignalReason WorkerPollReason = INVALID_PROCSIGNAL; +ProcSignalReason UserIdPollReason = INVALID_PROCSIGNAL; +ProcSignalReason QueryStatePollReason = INVALID_PROCSIGNAL; +ProcSignalReason WorkerPollReason = INVALID_PROCSIGNAL; static bool module_initialized = false; static int reqid = 0; -typedef struct -{ - slock_t mutex; /* protect concurrent access to `userid` */ - Oid userid; - Latch *caller; - pg_atomic_uint32 n_peers; -} RemoteUserIdResult; - -static void SendCurrentUserId(void); static void SendBgWorkerPids(void); static Oid GetRemoteBackendUserId(PGPROC *proc); static List *GetRemoteBackendWorkers(PGPROC *proc); @@ -90,12 +81,27 @@ static List *GetRemoteBackendQueryStates(PGPROC *leader, bool buffers, bool triggers, ExplainFormat format); +static shm_mq_result shm_mq_receive_with_timeout(shm_mq_handle *mqh, + Size *nbytesp, + void **datap, + int64 timeout); /* Shared memory variables */ static shm_toc *toc = NULL; -static RemoteUserIdResult *counterpart_userid = NULL; pg_qs_params *params = NULL; shm_mq *mq = NULL; +/* + * reqid and *mq_req_id are used to control request/response match on requestor side. + * + * - reqid is static variable used for tracking current request id on requestor side. + * - *mq_req_id is current shared message queue reqid + * + * *mq_req_id is used on signal handler as processed reqid and is set to shm_mq_msg.reqid, + * futher, when response arrives, this value is compared with reqid on requestor side. + * This prevents processing stale or outdated replies from previous attempts, timeouts, + * or concurrent calls. + */ +uint32 *mq_req_id = NULL; /* * Estimate amount of shared memory needed. @@ -111,9 +117,9 @@ pg_qs_shmem_size() nkeys = 3; - shm_toc_estimate_chunk(&e, sizeof(RemoteUserIdResult)); shm_toc_estimate_chunk(&e, sizeof(pg_qs_params)); shm_toc_estimate_chunk(&e, (Size) QUEUE_SIZE); + shm_toc_estimate_chunk(&e, sizeof(uint32)); shm_toc_estimate_keys(&e, nkeys); size = shm_toc_estimate(&e); @@ -138,29 +144,28 @@ pg_qs_shmem_startup(void) { toc = shm_toc_create(PG_QS_MODULE_KEY, shmem, shmem_size); - counterpart_userid = shm_toc_allocate(toc, sizeof(RemoteUserIdResult)); - shm_toc_insert(toc, num_toc++, counterpart_userid); - SpinLockInit(&counterpart_userid->mutex); - pg_atomic_init_u32(&counterpart_userid->n_peers, 0); - params = shm_toc_allocate(toc, sizeof(pg_qs_params)); shm_toc_insert(toc, num_toc++, params); mq = shm_toc_allocate(toc, QUEUE_SIZE); shm_toc_insert(toc, num_toc++, mq); + + mq_req_id = shm_toc_allocate(toc, sizeof(uint32)); + shm_toc_insert(toc, num_toc++, mq_req_id); + *mq_req_id = 0; } else { toc = shm_toc_attach(PG_QS_MODULE_KEY, shmem); #if PG_VERSION_NUM < 100000 - counterpart_userid = shm_toc_lookup(toc, num_toc++); params = shm_toc_lookup(toc, num_toc++); mq = shm_toc_lookup(toc, num_toc++); + mq_req_id = shm_toc_lookup(toc, num_toc++); #else - counterpart_userid = shm_toc_lookup(toc, num_toc++, false); params = shm_toc_lookup(toc, num_toc++, false); mq = shm_toc_lookup(toc, num_toc++, false); + mq_req_id = shm_toc_lookup(toc, num_toc++, false); #endif } LWLockRelease(AddinShmemInitLock); @@ -262,8 +267,7 @@ pg_qs_shmem_request(void) /* * ExecutorStart hook: - * set up flags to store runtime statistics, - * push current query description in global stack + * Set up flags to store runtime statistics. */ static void qs_ExecutorStart(QueryDesc *queryDesc, int eflags) @@ -525,8 +529,6 @@ pg_query_state(PG_FUNCTION_ARGS) shm_mq_msg *msg; List *bg_worker_procs = NIL; List *msgs; - instr_time start_time; - instr_time cur_time; if (!module_initialized) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), @@ -565,22 +567,7 @@ pg_query_state(PG_FUNCTION_ARGS) */ LockShmem(&tag, PG_QS_RCV_KEY); - INSTR_TIME_SET_CURRENT(start_time); - - while (pg_atomic_read_u32(&counterpart_userid->n_peers) != 0) - { - pg_usleep(1000000); /* wait one second */ - CHECK_FOR_INTERRUPTS(); - - INSTR_TIME_SET_CURRENT(cur_time); - INSTR_TIME_SUBTRACT(cur_time, start_time); - - if (INSTR_TIME_GET_MILLISEC(cur_time) > MAX_RCV_TIMEOUT) - { - elog(WARNING, "pg_query_state: last request was interrupted"); - break; - } - } + reqid = *mq_req_id + 1; counterpart_user_id = GetRemoteBackendUserId(proc); if (!(superuser() || GetUserId() == counterpart_user_id)) @@ -590,10 +577,6 @@ pg_query_state(PG_FUNCTION_ARGS) errmsg("permission denied"))); } - pg_atomic_write_u32(&counterpart_userid->n_peers, 1); - params->reqid = ++reqid; - pg_write_barrier(); - bg_worker_procs = GetRemoteBackendWorkers(proc); msgs = GetRemoteBackendQueryStates(proc, @@ -746,16 +729,6 @@ pg_query_state(PG_FUNCTION_ARGS) SRF_RETURN_DONE(funcctx); } -static void -SendCurrentUserId(void) -{ - SpinLockAcquire(&counterpart_userid->mutex); - counterpart_userid->userid = GetUserId(); - SpinLockRelease(&counterpart_userid->mutex); - - SetLatch(counterpart_userid->caller); -} - /* * Extract effective user id from backend on which `proc` points. * @@ -767,7 +740,12 @@ SendCurrentUserId(void) static Oid GetRemoteBackendUserId(PGPROC *proc) { - Oid result; + int sig_result; + shm_mq_handle *mqh; + shm_mq_result mq_receive_result; + shm_mq_userid_msg *msg; + Size msg_len; + LOCKTAG tag; #if PG_VERSION_NUM >= 170000 Assert(proc && proc->vxid.procNumber != INVALID_PROC_NUMBER); @@ -776,40 +754,53 @@ GetRemoteBackendUserId(PGPROC *proc) #endif Assert(UserIdPollReason != INVALID_PROCSIGNAL); - Assert(counterpart_userid); + Assert(mq); - counterpart_userid->userid = InvalidOid; - counterpart_userid->caller = MyLatch; - pg_write_barrier(); + LockShmem(&tag, PG_QS_SND_KEY); + /* fill in parameters of query state request */ + params->reason = UserIdPollReason; + mq = shm_mq_create(mq, QUEUE_SIZE); + shm_mq_set_sender(mq, proc); + shm_mq_set_receiver(mq, MyProc); + *mq_req_id = reqid; + UnlockShmem(&tag); #if PG_VERSION_NUM >= 170000 - SendProcSignal(proc->pid, UserIdPollReason, proc->vxid.procNumber); + sig_result = SendProcSignal(proc->pid, UserIdPollReason, proc->vxid.procNumber); #else - SendProcSignal(proc->pid, UserIdPollReason, proc->backendId); + sig_result = SendProcSignal(proc->pid, UserIdPollReason, proc->backendId); #endif - for (;;) + if (sig_result == -1) { - SpinLockAcquire(&counterpart_userid->mutex); - result = counterpart_userid->userid; - SpinLockRelease(&counterpart_userid->mutex); - - if (result != InvalidOid) - break; + ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("can't send signal to get remote backend userid"))); + return InvalidOid; + } + mqh = shm_mq_attach(mq, NULL, NULL); + mq_receive_result = shm_mq_receive_with_timeout(mqh, + &msg_len, + (void **) &msg, + MAX_RCV_TIMEOUT); + if (mq_receive_result != SHM_MQ_SUCCESS || msg == NULL || msg->reqid != reqid || msg_len != sizeof(shm_mq_userid_msg)) + { #if PG_VERSION_NUM < 100000 - WaitLatch(MyLatch, WL_LATCH_SET, 0); -#elif PG_VERSION_NUM < 120000 - WaitLatch(MyLatch, WL_LATCH_SET, 0, PG_WAIT_EXTENSION); + shm_mq_detach(mq); #else - WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0, - PG_WAIT_EXTENSION); + shm_mq_detach(mqh); #endif - CHECK_FOR_INTERRUPTS(); - ResetLatch(MyLatch); + ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("error in message queue data transmitting"))); + return InvalidOid; } - return result; +#if PG_VERSION_NUM < 100000 + shm_mq_detach(mq); +#else + shm_mq_detach(mqh); +#endif + return msg->userid; } /* @@ -835,7 +826,7 @@ shm_mq_receive_with_timeout(shm_mq_handle *mqh, { shm_mq_result mq_receive_result; - mq_receive_result = receive_msg_by_parts(mqh, nbytesp, datap, timeout, &rc, true); + mq_receive_result = receive_msg_by_parts(mqh, nbytesp, datap, delay, &rc, true); if (mq_receive_result != SHM_MQ_WOULD_BLOCK) return mq_receive_result; if (rc & WL_TIMEOUT || delay <= 0) @@ -918,12 +909,17 @@ SendBgWorkerPids(void) int i; shm_mq_handle *mqh; LOCKTAG tag; - shm_mq_result result; + msg_by_parts_result result; LockShmem(&tag, PG_QS_SND_KEY); - mqh = shm_mq_attach(mq, NULL, NULL); + if (shm_mq_get_sender(mq) != MyProc || params->reason != WorkerPollReason) + { + elog(WARNING, "could not send message queue to shared-memory queue: receiver has been interrupted and new request is being processed now."); + goto connection_cleanup; + } + foreach(iter, QueryDescStack) { QueryDesc *curQueryDesc = (QueryDesc *) lfirst(iter); @@ -936,7 +932,7 @@ SendBgWorkerPids(void) msg_len = offsetof(BgWorkerPids, pids) + sizeof(pid_t) * list_length(all_workers); msg = palloc(msg_len); - msg->reqid = params->reqid; + msg->reqid = *mq_req_id; msg->number = list_length(all_workers); i = 0; foreach(iter, all_workers) @@ -947,16 +943,23 @@ SendBgWorkerPids(void) msg->pids[i++] = current_pid; } -#if PG_VERSION_NUM < 150000 - result = shm_mq_send(mqh, msg_len, msg, false); -#else - result = shm_mq_send(mqh, msg_len, msg, false, true); -#endif + result = send_msg_by_parts(mqh, msg_len, msg); + + pfree(msg); /* Check for failure. */ - if(result == SHM_MQ_DETACHED) - elog(WARNING, "could not send message queue to shared-memory queue: receiver has been detached"); + if(result != MSG_BY_PARTS_SUCCEEDED) + { + elog(WARNING, "pg_query_state: peer seems to have detached"); + goto connection_cleanup; + } +connection_cleanup: +#if PG_VERSION_NUM < 100000 + shm_mq_detach(mq); +#else + shm_mq_detach(mqh); +#endif UnlockShmem(&tag); } @@ -985,9 +988,12 @@ GetRemoteBackendWorkers(PGPROC *proc) Assert(mq); LockShmem(&tag, PG_QS_SND_KEY); + /* fill in parameters of query state request */ + params->reason = WorkerPollReason; mq = shm_mq_create(mq, QUEUE_SIZE); shm_mq_set_sender(mq, proc); shm_mq_set_receiver(mq, MyProc); + *mq_req_id = reqid; UnlockShmem(&tag); #if PG_VERSION_NUM >= 170000 @@ -1000,7 +1006,10 @@ GetRemoteBackendWorkers(PGPROC *proc) goto signal_error; mqh = shm_mq_attach(mq, NULL, NULL); - mq_receive_result = shm_mq_receive(mqh, &msg_len, (void **) &msg, false); + mq_receive_result = shm_mq_receive_with_timeout(mqh, + &msg_len, + (void **) &msg, + MAX_RCV_TIMEOUT); if (mq_receive_result != SHM_MQ_SUCCESS || msg == NULL || msg->reqid != reqid || msg_len != offsetof(BgWorkerPids, pids) + msg->number*sizeof(pid_t)) goto mq_error; @@ -1012,7 +1021,6 @@ GetRemoteBackendWorkers(PGPROC *proc) continue; result = lcons(current_proc, result); } - #if PG_VERSION_NUM < 100000 shm_mq_detach(mq); #else @@ -1025,21 +1033,17 @@ GetRemoteBackendWorkers(PGPROC *proc) ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("invalid send signal"))); mq_error: +#if PG_VERSION_NUM < 100000 + shm_mq_detach(mq); +#else + shm_mq_detach(mqh); +#endif ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("error in message queue data transmitting"))); return NIL; } -static shm_mq_msg * -copy_msg(shm_mq_msg *msg) -{ - shm_mq_msg *result = palloc(msg->length); - - memcpy(result, msg, msg->length); - return result; -} - static shm_mq_result receive_msg_by_parts(shm_mq_handle *mqh, Size *total, void **datap, int64 timeout, int *rc, bool nowait) @@ -1115,7 +1119,6 @@ GetRemoteBackendQueryStates(PGPROC *leader, ExplainFormat format) { List *result = NIL; - List *alive_procs = NIL; ListCell *iter; int sig_result; shm_mq_handle *mqh; @@ -1127,20 +1130,20 @@ GetRemoteBackendQueryStates(PGPROC *leader, Assert(QueryStatePollReason != INVALID_PROCSIGNAL); Assert(mq); + /* initialize message queue that will transfer query states */ + LockShmem(&tag, PG_QS_SND_KEY); /* fill in parameters of query state request */ + params->reason = QueryStatePollReason; params->verbose = verbose; params->costs = costs; params->timing = timing; params->buffers = buffers; params->triggers = triggers; params->format = format; - pg_write_barrier(); - - /* initialize message queue that will transfer query states */ - LockShmem(&tag, PG_QS_SND_KEY); mq = shm_mq_create(mq, QUEUE_SIZE); shm_mq_set_sender(mq, leader); shm_mq_set_receiver(mq, MyProc); + *mq_req_id = reqid; UnlockShmem(&tag); /* @@ -1159,58 +1162,34 @@ GetRemoteBackendQueryStates(PGPROC *leader, if (sig_result == -1) goto signal_error; - foreach(iter, pworkers) - { - PGPROC *proc = (PGPROC *) lfirst(iter); - if (!proc || !proc->pid) - continue; - - pg_atomic_add_fetch_u32(&counterpart_userid->n_peers, 1); - -#if PG_VERSION_NUM >= 170000 - sig_result = SendProcSignal(proc->pid, - QueryStatePollReason, - proc->vxid.procNumber); -#else - sig_result = SendProcSignal(proc->pid, - QueryStatePollReason, - proc->backendId); -#endif - - if (sig_result == -1) - { - if (errno != ESRCH) - goto signal_error; - continue; - } - - alive_procs = lappend(alive_procs, proc); - } /* extract query state from leader process */ mqh = shm_mq_attach(mq, NULL, NULL); elog(DEBUG1, "Wait response from leader %d", leader->pid); - mq_receive_result = receive_msg_by_parts(mqh, &len, (void **) &msg, - 0, NULL, false); + mq_receive_result = shm_mq_receive_with_timeout(mqh, + &len, + (void **) &msg, + MAX_RCV_TIMEOUT); if (mq_receive_result != SHM_MQ_SUCCESS) goto mq_error; if (msg->reqid != reqid) goto mq_error; Assert(len == msg->length); - result = lappend(result, copy_msg(msg)); + result = lappend(result, msg); #if PG_VERSION_NUM < 100000 shm_mq_detach(mq); #else shm_mq_detach(mqh); #endif - /* * collect results from all alived parallel workers */ - foreach(iter, alive_procs) + foreach(iter, pworkers) { PGPROC *proc = (PGPROC *) lfirst(iter); + if (!proc || !proc->pid) + continue; /* prepare message queue to transfer data */ elog(DEBUG1, "Wait response from worker %d", proc->pid); @@ -1220,8 +1199,26 @@ GetRemoteBackendQueryStates(PGPROC *leader, shm_mq_set_receiver(mq, MyProc); /* this function notifies the counterpart to come into data transfer */ + *mq_req_id = reqid; UnlockShmem(&tag); +#if PG_VERSION_NUM >= 170000 + sig_result = SendProcSignal(proc->pid, + QueryStatePollReason, + proc->vxid.procNumber); +#else + sig_result = SendProcSignal(proc->pid, + QueryStatePollReason, + proc->backendId); +#endif + + if (sig_result == -1) + { + if (errno != ESRCH) + goto signal_error; + continue; + } + /* retrieve result data from message queue */ mqh = shm_mq_attach(mq, NULL, NULL); mq_receive_result = shm_mq_receive_with_timeout(mqh, @@ -1238,7 +1235,7 @@ GetRemoteBackendQueryStates(PGPROC *leader, Assert(len == msg->length); /* aggregate result data */ - result = lappend(result, copy_msg(msg)); + result = lappend(result, msg); #if PG_VERSION_NUM < 100000 shm_mq_detach(mq); @@ -1263,15 +1260,6 @@ GetRemoteBackendQueryStates(PGPROC *leader, return NIL; } -void -DetachPeer(void) -{ - int n_peers = pg_atomic_fetch_sub_u32(&counterpart_userid->n_peers, 1); - if (n_peers <= 0) - ereport(LOG, (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("pg_query_state peer is not responding"))); -} - /* * Extract the number of actual rows and planned rows from * the plan for one node in text format. Returns their ratio, @@ -1471,6 +1459,9 @@ pg_progress_bar(PG_FUNCTION_ARGS) errmsg("backend with pid=%d not found", pid))); LockShmem(&tag, PG_QS_RCV_KEY); + + reqid = *mq_req_id + 1; + counterpart_user_id = GetRemoteBackendUserId(proc); if (!(superuser() || GetUserId() == counterpart_user_id)) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), @@ -1478,11 +1469,6 @@ pg_progress_bar(PG_FUNCTION_ARGS) old_progress = 0; progress = 0; - if (SRF_IS_FIRSTCALL()) - { - pg_atomic_write_u32(&counterpart_userid->n_peers, 1); - params->reqid = ++reqid; - } bg_worker_procs = GetRemoteBackendWorkers(proc); msgs = GetRemoteBackendQueryStates(proc, @@ -1490,7 +1476,11 @@ pg_progress_bar(PG_FUNCTION_ARGS) 0, 1, 0, 0, 0, EXPLAIN_FORMAT_JSON); if (list_length(msgs) == 0) + { elog(WARNING, "backend does not reply"); + UnlockShmem(&tag); + PG_RETURN_FLOAT8((float8) -1); + } msg = (shm_mq_msg *) linitial(msgs); switch (msg->result_code) @@ -1547,7 +1537,11 @@ pg_progress_bar(PG_FUNCTION_ARGS) 0, 1, 0, 0, 0, EXPLAIN_FORMAT_JSON); if (list_length(msgs) == 0) + { elog(WARNING, "backend does not reply"); + UnlockShmem(&tag); + PG_RETURN_FLOAT8((float8) -1); + } msg = (shm_mq_msg *) linitial(msgs); } if (progress > -1) diff --git a/pg_query_state.h b/pg_query_state.h index 8272069..2ccaba5 100644 --- a/pg_query_state.h +++ b/pg_query_state.h @@ -50,9 +50,18 @@ typedef enum { QUERY_NOT_RUNNING, /* Backend doesn't execute any query */ STAT_DISABLED, /* Collection of execution statistics is disabled */ - QS_RETURNED /* Backend succx[esfully returned its query state */ + QS_RETURNED /* Backend successfully returned its query state */ } PG_QS_RequestResult; +/* + * An self-explanarory enum describing the send_msg_by_parts results + */ +typedef enum +{ + MSG_BY_PARTS_SUCCEEDED, + MSG_BY_PARTS_FAILED +} msg_by_parts_result; + /* * Format of transmited data through message queue */ @@ -68,12 +77,21 @@ typedef struct text records */ } shm_mq_msg; +/* + * User id transmit format. + */ +typedef struct +{ + Oid userid; + uint32 reqid; +} shm_mq_userid_msg; + #define BASE_SIZEOF_SHM_MQ_MSG (offsetof(shm_mq_msg, stack_depth)) /* pg_query_state arguments */ typedef struct { - int reqid; + ProcSignalReason reason; bool verbose; bool costs; bool timing; @@ -83,17 +101,23 @@ typedef struct } pg_qs_params; /* pg_query_state */ -extern bool pg_qs_enable; -extern bool pg_qs_timing; -extern bool pg_qs_buffers; -extern List *QueryDescStack; -extern pg_qs_params *params; -extern shm_mq *mq; +extern bool pg_qs_enable; +extern bool pg_qs_timing; +extern bool pg_qs_buffers; +extern List *QueryDescStack; +extern pg_qs_params * params; +extern shm_mq *mq; +extern uint32 *mq_req_id; + +extern ProcSignalReason UserIdPollReason; +extern ProcSignalReason QueryStatePollReason; +extern ProcSignalReason WorkerPollReason; /* signal_handler.c */ extern void SendQueryState(void); -extern void DetachPeer(void); +extern void SendCurrentUserId(void); extern void UnlockShmem(LOCKTAG *tag); extern void LockShmem(LOCKTAG *tag, uint32 key); +extern msg_by_parts_result send_msg_by_parts(shm_mq_handle *mqh, Size nbytes, const void *data); #endif diff --git a/signal_handler.c b/signal_handler.c index 09b1cf2..e4962a5 100644 --- a/signal_handler.c +++ b/signal_handler.c @@ -31,17 +31,6 @@ typedef struct char *plan; } stack_frame; -/* - * An self-explanarory enum describing the send_msg_by_parts results - */ -typedef enum -{ - MSG_BY_PARTS_SUCCEEDED, - MSG_BY_PARTS_FAILED -} msg_by_parts_result; - -static msg_by_parts_result send_msg_by_parts(shm_mq_handle *mqh, Size nbytes, const void *data); - /* * Get List of stack_frames as a stack of function calls starting from outermost call. * Each entry contains query text and query state in form of EXPLAIN ANALYZE output. @@ -197,7 +186,7 @@ shm_mq_send_nonblocking(shm_mq_handle *mqh, Size nbytes, const void *data, Size * send_msg_by_parts sends data through the queue as a bunch of messages * of smaller size */ -static msg_by_parts_result +msg_by_parts_result send_msg_by_parts(shm_mq_handle *mqh, Size nbytes, const void *data) { int bytes_left; @@ -229,55 +218,22 @@ void SendQueryState(void) { shm_mq_handle *mqh; - instr_time start_time; - instr_time cur_time; - int64 delay = MAX_SND_TIMEOUT; - int reqid = params->reqid; LOCKTAG tag; - INSTR_TIME_SET_CURRENT(start_time); - - /* wait until caller sets this process as sender to message queue */ - for (;;) - { - if (shm_mq_get_sender(mq) == MyProc) - break; - -#if PG_VERSION_NUM < 100000 - WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT, delay); -#elif PG_VERSION_NUM < 120000 - WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT, delay, PG_WAIT_IPC); -#else - WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH | WL_TIMEOUT, delay, PG_WAIT_IPC); -#endif - INSTR_TIME_SET_CURRENT(cur_time); - INSTR_TIME_SUBTRACT(cur_time, start_time); - - delay = MAX_SND_TIMEOUT - (int64) INSTR_TIME_GET_MILLISEC(cur_time); - if (delay <= 0) - { - elog(WARNING, "pg_query_state: failed to receive request from leader"); - DetachPeer(); - return; - } - CHECK_FOR_INTERRUPTS(); - ResetLatch(MyLatch); - } - LockShmem(&tag, PG_QS_SND_KEY); elog(DEBUG1, "Worker %d receives pg_query_state request from %d", shm_mq_get_sender(mq)->pid, shm_mq_get_receiver(mq)->pid); mqh = shm_mq_attach(mq, NULL, NULL); - if (reqid != params->reqid || shm_mq_get_sender(mq) != MyProc) + if (shm_mq_get_sender(mq) != MyProc || params->reason != QueryStatePollReason) { - UnlockShmem(&tag); - return; + elog(WARNING, "could not send message queue to shared-memory queue: receiver has been interrupted and new request is being processed now."); + goto connection_cleanup; } /* check if module is enabled */ if (!pg_qs_enable) { - shm_mq_msg msg = { reqid, BASE_SIZEOF_SHM_MQ_MSG, MyProc, STAT_DISABLED }; + shm_mq_msg msg = { *mq_req_id, BASE_SIZEOF_SHM_MQ_MSG, MyProc, STAT_DISABLED }; if(send_msg_by_parts(mqh, msg.length, &msg) != MSG_BY_PARTS_SUCCEEDED) goto connection_cleanup; @@ -286,7 +242,7 @@ SendQueryState(void) /* check if backend doesn't execute any query */ else if (list_length(QueryDescStack) == 0) { - shm_mq_msg msg = { reqid, BASE_SIZEOF_SHM_MQ_MSG, MyProc, QUERY_NOT_RUNNING }; + shm_mq_msg msg = { *mq_req_id, BASE_SIZEOF_SHM_MQ_MSG, MyProc, QUERY_NOT_RUNNING }; if(send_msg_by_parts(mqh, msg.length, &msg) != MSG_BY_PARTS_SUCCEEDED) goto connection_cleanup; @@ -299,7 +255,7 @@ SendQueryState(void) int msglen = sizeof(shm_mq_msg) + serialized_stack_length(qs_stack); shm_mq_msg *msg = palloc(msglen); - msg->reqid = reqid; + msg->reqid = *mq_req_id; msg->length = msglen; msg->proc = MyProc; msg->result_code = QS_RETURNED; @@ -313,24 +269,49 @@ SendQueryState(void) msg->stack_depth = list_length(qs_stack); serialize_stack(msg->stack, qs_stack); + list_free_deep(qs_stack); + if(send_msg_by_parts(mqh, msglen, msg) != MSG_BY_PARTS_SUCCEEDED) { elog(WARNING, "pg_query_state: peer seems to have detached"); + pfree(msg); goto connection_cleanup; } + else + pfree(msg); } elog(DEBUG1, "Worker %d sends response for pg_query_state to %d", shm_mq_get_sender(mq)->pid, shm_mq_get_receiver(mq)->pid); - DetachPeer(); + +connection_cleanup: +#if PG_VERSION_NUM < 100000 + shm_mq_detach(mq); +#else + shm_mq_detach(mqh); +#endif UnlockShmem(&tag); +} - return; +void +SendCurrentUserId(void) +{ + shm_mq_handle *mqh; + shm_mq_userid_msg msg; + LOCKTAG tag; + + msg.userid = GetUserId(); + LockShmem(&tag, PG_QS_SND_KEY); + + mqh = shm_mq_attach(mq, NULL, NULL); + msg.reqid = *mq_req_id; + if (shm_mq_get_sender(mq) != MyProc || params->reason != UserIdPollReason) + elog(WARNING, "could not send message queue to shared-memory queue: receiver has been interrupted and new request is being processed now."); + else if (send_msg_by_parts(mqh, sizeof(msg), &msg) != MSG_BY_PARTS_SUCCEEDED) + elog(WARNING, "could not send message queue to shared-memory queue."); -connection_cleanup: #if PG_VERSION_NUM < 100000 shm_mq_detach(mq); #else shm_mq_detach(mqh); #endif - DetachPeer(); UnlockShmem(&tag); }